aboutsummaryrefslogtreecommitdiffstats
path: root/les
diff options
context:
space:
mode:
authorZsolt Felfoldi <zsfelfoldi@gmail.com>2016-11-30 13:02:08 +0800
committerZsolt Felfoldi <zsfelfoldi@gmail.com>2016-12-10 16:53:08 +0800
commitaf8a742d00f9d47b832f6f2d50a8e1c89bbf8441 (patch)
tree92fca6d9750797a9c5f1fc712e2de1181be60390 /les
parente67500aa15a2f51a96f0ae91ab3af898b81d82f2 (diff)
downloaddexon-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.tar
dexon-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.tar.gz
dexon-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.tar.bz2
dexon-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.tar.lz
dexon-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.tar.xz
dexon-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.tar.zst
dexon-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.zip
les: improved header fetcher and server statistics
Diffstat (limited to 'les')
-rw-r--r--les/fetcher.go754
-rw-r--r--les/handler.go53
-rw-r--r--les/helper_test.go11
-rw-r--r--les/odr.go55
-rw-r--r--les/odr_peerset.go120
-rw-r--r--les/odr_test.go8
-rw-r--r--les/peer.go70
-rw-r--r--les/request_test.go7
-rw-r--r--les/serverpool.go204
9 files changed, 801 insertions, 481 deletions
diff --git a/les/fetcher.go b/les/fetcher.go
index 24cf51839..0c8be9e2a 100644
--- a/les/fetcher.go
+++ b/les/fetcher.go
@@ -23,173 +23,416 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/light"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
)
+const (
+ blockDelayTimeout = time.Second * 10 // timeout for a peer to announce a head that has already been confirmed by others
+ maxNodeCount = 20 // maximum number of fetcherTreeNode entries remembered for each peer
+)
+
+// lightFetcher
type lightFetcher struct {
pm *ProtocolManager
odr *LesOdr
- chain BlockChain
-
- headAnnouncedMu sync.Mutex
- headAnnouncedBy map[common.Hash][]*peer
- currentTd *big.Int
- deliverChn chan fetchResponse
- reqMu sync.RWMutex
- requested map[uint64]fetchRequest
- timeoutChn chan uint64
- notifyChn chan bool // true if initiated from outside
- syncing bool
- syncDone chan struct{}
+ chain *light.LightChain
+
+ maxConfirmedTd *big.Int
+ peers map[*peer]*fetcherPeerInfo
+ lastUpdateStats *updateStatsEntry
+
+ lock sync.Mutex // qwerqwerqwe
+ deliverChn chan fetchResponse
+ reqMu sync.RWMutex
+ requested map[uint64]fetchRequest
+ timeoutChn chan uint64
+ requestChn chan bool // true if initiated from outside
+ syncing bool
+ syncDone chan *peer
+}
+
+// fetcherPeerInfo holds fetcher-specific information about each active peer
+type fetcherPeerInfo struct {
+ root, lastAnnounced *fetcherTreeNode
+ nodeCnt int
+ confirmedTd *big.Int
+ bestConfirmed *fetcherTreeNode
+ nodeByHash map[common.Hash]*fetcherTreeNode
+ firstUpdateStats *updateStatsEntry
}
+// fetcherTreeNode is a node of a tree that holds information about blocks recently
+// announced and confirmed by a certain peer. Each new announce message from a peer
+// adds nodes to the tree, based on the previous announced head and the reorg depth.
+// There are three possible states for a tree node:
+// - announced: not downloaded (known) yet, but we know its head, number and td
+// - intermediate: not known, hash and td are empty, they are filled out when it becomes known
+// - known: both announced by this peer and downloaded (from any peer).
+// This structure makes it possible to always know which peer has a certain block,
+// which is necessary for selecting a suitable peer for ODR requests and also for
+// canonizing new heads. It also helps to always download the minimum necessary
+// amount of headers with a single request.
+type fetcherTreeNode struct {
+ hash common.Hash
+ number uint64
+ td *big.Int
+ known, requested bool
+ parent *fetcherTreeNode
+ children []*fetcherTreeNode
+}
+
+// fetchRequest represents a header download request
type fetchRequest struct {
- hash common.Hash
- amount uint64
- peer *peer
+ hash common.Hash
+ amount uint64
+ peer *peer
+ sent mclock.AbsTime
+ timeout bool
}
+// fetchResponse represents a header download response
type fetchResponse struct {
reqID uint64
headers []*types.Header
peer *peer
}
+// newLightFetcher creates a new light fetcher
func newLightFetcher(pm *ProtocolManager) *lightFetcher {
f := &lightFetcher{
- pm: pm,
- chain: pm.blockchain,
- odr: pm.odr,
- headAnnouncedBy: make(map[common.Hash][]*peer),
- deliverChn: make(chan fetchResponse, 100),
- requested: make(map[uint64]fetchRequest),
- timeoutChn: make(chan uint64),
- notifyChn: make(chan bool, 100),
- syncDone: make(chan struct{}),
- currentTd: big.NewInt(0),
+ pm: pm,
+ chain: pm.blockchain.(*light.LightChain),
+ odr: pm.odr,
+ peers: make(map[*peer]*fetcherPeerInfo),
+ deliverChn: make(chan fetchResponse, 100),
+ requested: make(map[uint64]fetchRequest),
+ timeoutChn: make(chan uint64),
+ requestChn: make(chan bool, 100),
+ syncDone: make(chan *peer),
+ maxConfirmedTd: big.NewInt(0),
}
go f.syncLoop()
return f
}
-func (f *lightFetcher) notify(p *peer, head *announceData) {
- var headHash common.Hash
- if head == nil {
- // initial notify
- headHash = p.Head()
- } else {
- if core.GetTd(f.pm.chainDb, head.Hash, head.Number) != nil {
- head.haveHeaders = head.Number
- }
- //fmt.Println("notify", p.id, head.Number, head.ReorgDepth, head.haveHeaders)
- if !p.addNotify(head) {
- //fmt.Println("addNotify fail")
- f.pm.removePeer(p.id)
+// syncLoop is the main event loop of the light fetcher
+func (f *lightFetcher) syncLoop() {
+ f.pm.wg.Add(1)
+ defer f.pm.wg.Done()
+
+ requestStarted := false
+ for {
+ select {
+ case <-f.pm.quitSync:
+ return
+ // when a new announce is received, request loop keeps running until
+ // no further requests are necessary or possible
+ case newAnnounce := <-f.requestChn:
+ f.lock.Lock()
+ s := requestStarted
+ requestStarted = false
+ if !f.syncing && !(newAnnounce && s) {
+ if peer, node, amount := f.nextRequest(); node != nil {
+ requestStarted = true
+ reqID, started := f.request(peer, node, amount)
+ if started {
+ go func() {
+ time.Sleep(softRequestTimeout)
+ f.reqMu.Lock()
+ req, ok := f.requested[reqID]
+ if ok {
+ req.timeout = true
+ f.requested[reqID] = req
+ }
+ f.reqMu.Unlock()
+ // keep starting new requests while possible
+ f.requestChn <- false
+ }()
+ }
+ }
+ }
+ f.lock.Unlock()
+ case reqID := <-f.timeoutChn:
+ f.reqMu.Lock()
+ req, ok := f.requested[reqID]
+ if ok {
+ delete(f.requested, reqID)
+ }
+ f.reqMu.Unlock()
+ if ok {
+ f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), true)
+ glog.V(logger.Debug).Infof("hard timeout by peer %v", req.peer.id)
+ go f.pm.removePeer(req.peer.id)
+ }
+ case resp := <-f.deliverChn:
+ f.reqMu.Lock()
+ req, ok := f.requested[resp.reqID]
+ if ok && req.peer != resp.peer {
+ ok = false
+ }
+ if ok {
+ delete(f.requested, resp.reqID)
+ }
+ f.reqMu.Unlock()
+ if ok {
+ f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), req.timeout)
+ }
+ f.lock.Lock()
+ if !ok || !(f.syncing || f.processResponse(req, resp)) {
+ glog.V(logger.Debug).Infof("failed processing response by peer %v", resp.peer.id)
+ go f.pm.removePeer(resp.peer.id)
+ }
+ f.lock.Unlock()
+ case p := <-f.syncDone:
+ f.lock.Lock()
+ glog.V(logger.Debug).Infof("done synchronising with peer %v", p.id)
+ f.checkSyncedHeaders(p)
+ f.syncing = false
+ f.lock.Unlock()
}
- headHash = head.Hash
}
- f.headAnnouncedMu.Lock()
- f.headAnnouncedBy[headHash] = append(f.headAnnouncedBy[headHash], p)
- f.headAnnouncedMu.Unlock()
- f.notifyChn <- true
}
-func (f *lightFetcher) gotHeader(header *types.Header) {
- f.headAnnouncedMu.Lock()
- defer f.headAnnouncedMu.Unlock()
+// addPeer adds a new peer to the fetcher's peer set
+func (f *lightFetcher) addPeer(p *peer) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)}
+}
- hash := header.Hash()
- peerList := f.headAnnouncedBy[hash]
- if peerList == nil {
+// removePeer removes a new peer from the fetcher's peer set
+func (f *lightFetcher) removePeer(p *peer) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ // check for potential timed out block delay statistics
+ f.checkUpdateStats(p, nil)
+ delete(f.peers, p)
+}
+
+// announce processes a new announcement message received from a peer, adding new
+// nodes to the peer's block tree and removing old nodes if necessary
+func (f *lightFetcher) announce(p *peer, head *announceData) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ glog.V(logger.Debug).Infof("received announce from peer %v #%d %016x reorg: %d", p.id, head.Number, head.Hash[:8], head.ReorgDepth)
+
+ fp := f.peers[p]
+ if fp == nil {
+ glog.V(logger.Debug).Infof("announce: unknown peer")
+ return
+ }
+
+ if fp.lastAnnounced != nil && head.Td.Cmp(fp.lastAnnounced.td) <= 0 {
+ // announced tds should be strictly monotonic
+ glog.V(logger.Debug).Infof("non-monotonic Td from peer %v", p.id)
+ go f.pm.removePeer(p.id)
return
}
- number := header.Number.Uint64()
- td := core.GetTd(f.pm.chainDb, hash, number)
- for _, peer := range peerList {
- peer.lock.Lock()
- ok := peer.gotHeader(hash, number, td)
- peer.lock.Unlock()
- if !ok {
- //fmt.Println("gotHeader fail")
- f.pm.removePeer(peer.id)
+
+ n := fp.lastAnnounced
+ for i := uint64(0); i < head.ReorgDepth; i++ {
+ if n == nil {
+ break
}
+ n = n.parent
}
- delete(f.headAnnouncedBy, hash)
-}
+ if n != nil {
+ // n is now the reorg common ancestor, add a new branch of nodes
+ // check if the node count is too high to add new nodes
+ locked := false
+ for uint64(fp.nodeCnt)+head.Number-n.number > maxNodeCount && fp.root != nil {
+ if !locked {
+ f.chain.LockChain()
+ defer f.chain.UnlockChain()
+ locked = true
+ }
+ // if one of root's children is canonical, keep it, delete other branches and root itself
+ var newRoot *fetcherTreeNode
+ for i, nn := range fp.root.children {
+ if core.GetCanonicalHash(f.pm.chainDb, nn.number) == nn.hash {
+ fp.root.children = append(fp.root.children[:i], fp.root.children[i+1:]...)
+ nn.parent = nil
+ newRoot = nn
+ break
+ }
+ }
+ fp.deleteNode(fp.root)
+ if n == fp.root {
+ n = newRoot
+ }
+ fp.root = newRoot
+ if newRoot == nil || !f.checkKnownNode(p, newRoot) {
+ fp.bestConfirmed = nil
+ fp.confirmedTd = nil
+ }
-func (f *lightFetcher) nextRequest() (*peer, *announceData) {
- var bestPeer *peer
- bestTd := f.currentTd
- for _, peer := range f.pm.peers.AllPeers() {
- peer.lock.RLock()
- if !peer.headInfo.requested && (peer.headInfo.Td.Cmp(bestTd) > 0 ||
- (bestPeer != nil && peer.headInfo.Td.Cmp(bestTd) == 0 && peer.headInfo.haveHeaders > bestPeer.headInfo.haveHeaders)) {
- bestPeer = peer
- bestTd = peer.headInfo.Td
+ if n == nil {
+ break
+ }
}
- peer.lock.RUnlock()
- }
- if bestPeer == nil {
- return nil, nil
- }
- bestPeer.lock.Lock()
- res := bestPeer.headInfo
- res.requested = true
- bestPeer.lock.Unlock()
- for _, peer := range f.pm.peers.AllPeers() {
- if peer != bestPeer {
- peer.lock.Lock()
- if peer.headInfo.Hash == bestPeer.headInfo.Hash && peer.headInfo.haveHeaders == bestPeer.headInfo.haveHeaders {
- peer.headInfo.requested = true
+ if n != nil {
+ for n.number < head.Number {
+ nn := &fetcherTreeNode{number: n.number + 1, parent: n}
+ n.children = append(n.children, nn)
+ n = nn
+ fp.nodeCnt++
}
- peer.lock.Unlock()
+ n.hash = head.Hash
+ n.td = head.Td
+ fp.nodeByHash[n.hash] = n
}
}
- return bestPeer, res
-}
+ if n == nil {
+ // could not find reorg common ancestor or had to delete entire tree, a new root and a resync is needed
+ if fp.root != nil {
+ fp.deleteNode(fp.root)
+ }
+ n = &fetcherTreeNode{hash: head.Hash, number: head.Number, td: head.Td}
+ fp.root = n
+ fp.nodeCnt++
+ fp.nodeByHash[n.hash] = n
+ fp.bestConfirmed = nil
+ fp.confirmedTd = nil
+ }
-func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types.Header) {
- f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer}
+ f.checkKnownNode(p, n)
+ p.lock.Lock()
+ p.headInfo = head
+ fp.lastAnnounced = n
+ p.lock.Unlock()
+ f.checkUpdateStats(p, nil)
+ f.requestChn <- true
}
-func (f *lightFetcher) requestedID(reqID uint64) bool {
- f.reqMu.RLock()
- _, ok := f.requested[reqID]
- f.reqMu.RUnlock()
- return ok
+// peerHasBlock returns true if we can assume the peer knows the given block
+// based on its announcements
+func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bool {
+ f.lock.Lock()
+ defer f.lock.Lock()
+
+ fp := f.peers[p]
+ if fp == nil || fp.root == nil {
+ return false
+ }
+
+ if number >= fp.root.number {
+ // it is recent enough that if it is known, is should be in the peer's block tree
+ return fp.nodeByHash[hash] != nil
+ }
+ f.chain.LockChain()
+ defer f.chain.UnlockChain()
+ // if it's older than the peer's block tree root but it's in the same canonical chain
+ // than the root, we can still be sure the peer knows it
+ return core.GetCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && core.GetCanonicalHash(f.pm.chainDb, number) == hash
}
-func (f *lightFetcher) request(p *peer, block *announceData) {
- //fmt.Println("request", p.id, block.Number, block.haveHeaders)
- amount := block.Number - block.haveHeaders
- if amount == 0 {
- return
+// request initiates a header download request from a certain peer
+func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint64, bool) {
+ fp := f.peers[p]
+ if fp == nil {
+ glog.V(logger.Debug).Infof("request: unknown peer")
+ return 0, false
}
- if amount > 100 {
+ if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) {
f.syncing = true
go func() {
- //fmt.Println("f.pm.synchronise(p)")
+ glog.V(logger.Debug).Infof("synchronising with peer %v", p.id)
f.pm.synchronise(p)
- //fmt.Println("sync done")
- f.syncDone <- struct{}{}
+ f.syncDone <- p
}()
- return
+ return 0, false
}
reqID := f.odr.getNextReqID()
- f.reqMu.Lock()
- f.requested[reqID] = fetchRequest{hash: block.Hash, amount: amount, peer: p}
- f.reqMu.Unlock()
+ n.requested = true
cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
p.fcServer.SendRequest(reqID, cost)
- go p.RequestHeadersByHash(reqID, cost, block.Hash, int(amount), 0, true)
+ f.reqMu.Lock()
+ f.requested[reqID] = fetchRequest{hash: n.hash, amount: amount, peer: p, sent: mclock.Now()}
+ f.reqMu.Unlock()
+ go p.RequestHeadersByHash(reqID, cost, n.hash, int(amount), 0, true)
go func() {
time.Sleep(hardRequestTimeout)
f.timeoutChn <- reqID
}()
+ return reqID, true
+}
+
+// requestAmount calculates the amount of headers to be downloaded starting
+// from a certain head backwards
+func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 {
+ amount := uint64(0)
+ nn := n
+ for nn != nil && !f.checkKnownNode(p, nn) {
+ nn = nn.parent
+ amount++
+ }
+ if nn == nil {
+ amount = n.number
+ }
+ return amount
+}
+
+// requestedID tells if a certain reqID has been requested by the fetcher
+func (f *lightFetcher) requestedID(reqID uint64) bool {
+ f.reqMu.RLock()
+ _, ok := f.requested[reqID]
+ f.reqMu.RUnlock()
+ return ok
+}
+
+// nextRequest selects the peer and announced head to be requested next, amount
+// to be downloaded starting from the head backwards is also returned
+func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) {
+ var (
+ bestHash common.Hash
+ bestAmount uint64
+ )
+ bestTd := f.maxConfirmedTd
+
+ for p, fp := range f.peers {
+ for hash, n := range fp.nodeByHash {
+ if !f.checkKnownNode(p, n) && !n.requested && (bestTd == nil || n.td.Cmp(bestTd) >= 0) {
+ amount := f.requestAmount(p, n)
+ if bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount {
+ bestHash = hash
+ bestAmount = amount
+ bestTd = n.td
+ }
+ }
+ }
+ }
+ if bestTd == f.maxConfirmedTd {
+ return nil, nil, 0
+ }
+
+ peer := f.pm.serverPool.selectPeer(func(p *peer) (bool, uint64) {
+ fp := f.peers[p]
+ if fp == nil || fp.nodeByHash[bestHash] == nil {
+ return false, 0
+ }
+ return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)))
+ })
+ var node *fetcherTreeNode
+ if peer != nil {
+ node = f.peers[peer].nodeByHash[bestHash]
+ }
+ return peer, node, bestAmount
}
+// deliverHeaders delivers header download request responses for processing
+func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types.Header) {
+ f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer}
+}
+
+// processResponse processes header download request responses
func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool {
if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash {
return false
@@ -201,101 +444,248 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo
if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil {
return false
}
- for _, header := range headers {
- td := core.GetTd(f.pm.chainDb, header.Hash(), header.Number.Uint64())
+ tds := make([]*big.Int, len(headers))
+ for i, header := range headers {
+ td := f.chain.GetTd(header.Hash(), header.Number.Uint64())
if td == nil {
return false
}
- if td.Cmp(f.currentTd) > 0 {
- f.currentTd = td
- }
- f.gotHeader(header)
+ tds[i] = td
}
+ f.newHeaders(headers, tds)
return true
}
-func (f *lightFetcher) checkSyncedHeaders() {
- //fmt.Println("checkSyncedHeaders()")
- for _, peer := range f.pm.peers.AllPeers() {
- peer.lock.Lock()
- h := peer.firstHeadInfo
- remove := false
- loop:
- for h != nil {
- if td := core.GetTd(f.pm.chainDb, h.Hash, h.Number); td != nil {
- //fmt.Println(" found", h.Number)
- ok := peer.gotHeader(h.Hash, h.Number, td)
- if !ok {
- remove = true
- break loop
- }
- if td.Cmp(f.currentTd) > 0 {
- f.currentTd = td
- }
- }
- h = h.next
+// newHeaders updates the block trees of all active peers according to a newly
+// downloaded and validated batch or headers
+func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) {
+ var maxTd *big.Int
+ for p, fp := range f.peers {
+ if !f.checkAnnouncedHeaders(fp, headers, tds) {
+ glog.V(logger.Debug).Infof("announce inconsistency by peer %v", p.id)
+ go f.pm.removePeer(p.id)
}
- peer.lock.Unlock()
- if remove {
- //fmt.Println("checkSync fail")
- f.pm.removePeer(peer.id)
+ if fp.confirmedTd != nil && (maxTd == nil || maxTd.Cmp(fp.confirmedTd) > 0) {
+ maxTd = fp.confirmedTd
}
}
+ if maxTd != nil {
+ f.updateMaxConfirmedTd(maxTd)
+ }
}
-func (f *lightFetcher) syncLoop() {
- f.pm.wg.Add(1)
- defer f.pm.wg.Done()
+// checkAnnouncedHeaders updates peer's block tree if necessary after validating
+// a batch of headers. It searches for the latest header in the batch that has a
+// matching tree node (if any), and if it has not been marked as known already,
+// sets it and its parents to known (even those which are older than the currently
+// validated ones). Return value shows if all hashes, numbers and Tds matched
+// correctly to the announced values (otherwise the peer should be dropped).
+func (f *lightFetcher) checkAnnouncedHeaders(fp *fetcherPeerInfo, headers []*types.Header, tds []*big.Int) bool {
+ var (
+ n *fetcherTreeNode
+ header *types.Header
+ td *big.Int
+ )
- srtoNotify := false
- for {
- select {
- case <-f.pm.quitSync:
- return
- case ext := <-f.notifyChn:
- //fmt.Println("<-f.notifyChn", f.syncing, ext, srtoNotify)
- s := srtoNotify
- srtoNotify = false
- if !f.syncing && !(ext && s) {
- if p, r := f.nextRequest(); r != nil {
- srtoNotify = true
- go func() {
- time.Sleep(softRequestTimeout)
- f.notifyChn <- false
- }()
- f.request(p, r)
+ for i := len(headers) - 1; ; i-- {
+ if i < 0 {
+ if n == nil {
+ // no more headers and nothing to match
+ return true
+ }
+ // we ran out of recently delivered headers but have not reached a node known by this peer yet, continue matching
+ td = f.chain.GetTd(header.ParentHash, header.Number.Uint64()-1)
+ header = f.chain.GetHeader(header.ParentHash, header.Number.Uint64()-1)
+ } else {
+ header = headers[i]
+ td = tds[i]
+ }
+ hash := header.Hash()
+ number := header.Number.Uint64()
+ if n == nil {
+ n = fp.nodeByHash[hash]
+ }
+ if n != nil {
+ if n.td == nil {
+ // node was unannounced
+ if nn := fp.nodeByHash[hash]; nn != nil {
+ // if there was already a node with the same hash, continue there and drop this one
+ nn.children = append(nn.children, n.children...)
+ n.children = nil
+ fp.deleteNode(n)
+ n = nn
+ } else {
+ n.hash = hash
+ n.td = td
+ fp.nodeByHash[hash] = n
}
}
- case reqID := <-f.timeoutChn:
- f.reqMu.Lock()
- req, ok := f.requested[reqID]
- if ok {
- delete(f.requested, reqID)
+ // check if it matches the header
+ if n.hash != hash || n.number != number || n.td.Cmp(td) != 0 {
+ // peer has previously made an invalid announcement
+ return false
}
- f.reqMu.Unlock()
- if ok {
- //fmt.Println("hard timeout")
- f.pm.removePeer(req.peer.id)
+ if n.known {
+ // we reached a known node that matched our expectations, return with success
+ return true
}
- case resp := <-f.deliverChn:
- //fmt.Println("<-f.deliverChn", f.syncing)
- f.reqMu.Lock()
- req, ok := f.requested[resp.reqID]
- if ok && req.peer != resp.peer {
- ok = false
+ n.known = true
+ if fp.confirmedTd == nil || td.Cmp(fp.confirmedTd) > 0 {
+ fp.confirmedTd = td
+ fp.bestConfirmed = n
}
- if ok {
- delete(f.requested, resp.reqID)
+ n = n.parent
+ if n == nil {
+ return true
}
- f.reqMu.Unlock()
- if !ok || !(f.syncing || f.processResponse(req, resp)) {
- //fmt.Println("processResponse fail")
- f.pm.removePeer(resp.peer.id)
+ }
+ }
+}
+
+// checkSyncedHeaders updates peer's block tree after synchronisation by marking
+// downloaded headers as known. If none of the announced headers are found after
+// syncing, the peer is dropped.
+func (f *lightFetcher) checkSyncedHeaders(p *peer) {
+ fp := f.peers[p]
+ if fp == nil {
+ glog.V(logger.Debug).Infof("checkSyncedHeaders: unknown peer")
+ return
+ }
+ n := fp.lastAnnounced
+ var td *big.Int
+ for n != nil {
+ if td = f.chain.GetTd(n.hash, n.number); td != nil {
+ break
+ }
+ n = n.parent
+ }
+ // now n is the latest downloaded header after syncing
+ if n == nil {
+ glog.V(logger.Debug).Infof("synchronisation failed with peer %v", p.id)
+ go f.pm.removePeer(p.id)
+ } else {
+ header := f.chain.GetHeader(n.hash, n.number)
+ f.newHeaders([]*types.Header{header}, []*big.Int{td})
+ }
+}
+
+// checkKnownNode checks if a block tree node is known (downloaded and validated)
+// If it was not known previously but found in the database, sets its known flag
+func (f *lightFetcher) checkKnownNode(p *peer, n *fetcherTreeNode) bool {
+ if n.known {
+ return true
+ }
+ td := f.chain.GetTd(n.hash, n.number)
+ if td == nil {
+ return false
+ }
+
+ fp := f.peers[p]
+ if fp == nil {
+ glog.V(logger.Debug).Infof("checkKnownNode: unknown peer")
+ return false
+ }
+ header := f.chain.GetHeader(n.hash, n.number)
+ if !f.checkAnnouncedHeaders(fp, []*types.Header{header}, []*big.Int{td}) {
+ glog.V(logger.Debug).Infof("announce inconsistency by peer %v", p.id)
+ go f.pm.removePeer(p.id)
+ }
+ if fp.confirmedTd != nil {
+ f.updateMaxConfirmedTd(fp.confirmedTd)
+ }
+ return n.known
+}
+
+// deleteNode deletes a node and its child subtrees from a peer's block tree
+func (fp *fetcherPeerInfo) deleteNode(n *fetcherTreeNode) {
+ if n.parent != nil {
+ for i, nn := range n.parent.children {
+ if nn == n {
+ n.parent.children = append(n.parent.children[:i], n.parent.children[i+1:]...)
+ break
}
- case <-f.syncDone:
- //fmt.Println("<-f.syncDone", f.syncing)
- f.checkSyncedHeaders()
- f.syncing = false
+ }
+ }
+ for {
+ if n.td != nil {
+ delete(fp.nodeByHash, n.hash)
+ }
+ fp.nodeCnt--
+ if len(n.children) == 0 {
+ return
+ }
+ for i, nn := range n.children {
+ if i == 0 {
+ n = nn
+ } else {
+ fp.deleteNode(nn)
+ }
+ }
+ }
+}
+
+// updateStatsEntry items form a linked list that is expanded with a new item every time a new head with a higher Td
+// than the previous one has been downloaded and validated. The list contains a series of maximum confirmed Td values
+// and the time these values have been confirmed, both increasing monotonically. A maximum confirmed Td is calculated
+// both globally for all peers and also for each individual peer (meaning that the given peer has announced the head
+// and it has also been downloaded from any peer, either before or after the given announcement).
+// The linked list has a global tail where new confirmed Td entries are added and a separate head for each peer,
+// pointing to the next Td entry that is higher than the peer's max confirmed Td (nil if it has already confirmed
+// the current global head).
+type updateStatsEntry struct {
+ time mclock.AbsTime
+ td *big.Int
+ next *updateStatsEntry
+}
+
+// updateMaxConfirmedTd updates the block delay statistics of active peers. Whenever a new highest Td is confirmed,
+// adds it to the end of a linked list together with the time it has been confirmed. Then checks which peers have
+// already confirmed a head with the same or higher Td (which counts as zero block delay) and updates their statistics.
+// Those who have not confirmed such a head by now will be updated by a subsequent checkUpdateStats call with a
+// positive block delay value.
+func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) {
+ if f.maxConfirmedTd == nil || td.Cmp(f.maxConfirmedTd) > 0 {
+ f.maxConfirmedTd = td
+ newEntry := &updateStatsEntry{
+ time: mclock.Now(),
+ td: td,
+ }
+ if f.lastUpdateStats != nil {
+ f.lastUpdateStats.next = newEntry
+ }
+ f.lastUpdateStats = newEntry
+ for p, _ := range f.peers {
+ f.checkUpdateStats(p, newEntry)
+ }
+ }
+}
+
+// checkUpdateStats checks those peers who have not confirmed a certain highest Td (or a larger one) by the time it
+// has been confirmed by another peer. If they have confirmed such a head by now, their stats are updated with the
+// block delay which is (this peer's confirmation time)-(first confirmation time). After blockDelayTimeout has passed,
+// the stats are updated with blockDelayTimeout value. In either case, the confirmed or timed out updateStatsEntry
+// items are removed from the head of the linked list.
+// If a new entry has been added to the global tail, it is passed as a parameter here even though this function
+// assumes that it has already been added, so that if the peer's list is empty (all heads confirmed, head is nil),
+// it can set the new head to newEntry.
+func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) {
+ now := mclock.Now()
+ fp := f.peers[p]
+ if fp == nil {
+ glog.V(logger.Debug).Infof("checkUpdateStats: unknown peer")
+ return
+ }
+ if newEntry != nil && fp.firstUpdateStats == nil {
+ fp.firstUpdateStats = newEntry
+ }
+ for fp.firstUpdateStats != nil && fp.firstUpdateStats.time <= now-mclock.AbsTime(blockDelayTimeout) {
+ f.pm.serverPool.adjustBlockDelay(p.poolEntry, blockDelayTimeout)
+ fp.firstUpdateStats = fp.firstUpdateStats.next
+ }
+ if fp.confirmedTd != nil {
+ for fp.firstUpdateStats != nil && fp.firstUpdateStats.td.Cmp(fp.confirmedTd) <= 0 {
+ f.pm.serverPool.adjustBlockDelay(p.poolEntry, time.Duration(now-fp.firstUpdateStats.time))
+ fp.firstUpdateStats = fp.firstUpdateStats.next
}
}
}
diff --git a/les/handler.go b/les/handler.go
index f1a8bc62c..4535aaeb9 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -24,10 +24,8 @@ import (
"math/big"
"net"
"sync"
- "time"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
@@ -60,7 +58,7 @@ const (
MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
MaxTxSend = 64 // Amount of transactions to be send per request
- disableClientRemovePeer = true
+ disableClientRemovePeer = false
)
// errIncompatibleConfig is returned if the requested protocols and configs are
@@ -157,44 +155,27 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
var entry *poolEntry
+ peer := manager.newPeer(int(version), networkId, p, rw)
if manager.serverPool != nil {
addr := p.RemoteAddr().(*net.TCPAddr)
- entry = manager.serverPool.connect(p.ID(), addr.IP, uint16(addr.Port))
+ entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
if entry == nil {
return fmt.Errorf("unwanted connection")
}
}
- peer := manager.newPeer(int(version), networkId, p, rw)
peer.poolEntry = entry
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
- start := mclock.Now()
err := manager.handle(peer)
if entry != nil {
- connTime := time.Duration(mclock.Now() - start)
- stopped := false
- select {
- case <-manager.quitSync:
- stopped = true
- default:
- }
- //fmt.Println("connTime", peer.id, connTime, stopped, err)
- quality := float64(1)
- setQuality := true
- if connTime < time.Minute*10 {
- quality = 0
- if stopped {
- setQuality = false
- }
- }
- manager.serverPool.disconnect(entry, quality, setQuality)
+ manager.serverPool.disconnect(entry)
}
return err
case <-manager.quitSync:
if entry != nil {
- manager.serverPool.disconnect(entry, 0, false)
+ manager.serverPool.disconnect(entry)
}
return p2p.DiscQuitting
}
@@ -224,7 +205,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
- manager.fetcher = newLightFetcher(manager)
}
if odr != nil {
@@ -254,10 +234,12 @@ func (pm *ProtocolManager) removePeer(id string) {
glog.V(logger.Debug).Infof("LES: unregister peer %v", id)
if pm.lightSync {
pm.downloader.UnregisterPeer(id)
- pm.odr.UnregisterPeer(peer)
if pm.txrelay != nil {
pm.txrelay.removePeer(id)
}
+ if pm.fetcher != nil {
+ pm.fetcher.removePeer(peer)
+ }
}
if err := pm.peers.Unregister(id); err != nil {
glog.V(logger.Error).Infoln("Removal failed:", err)
@@ -276,8 +258,10 @@ func (pm *ProtocolManager) Start(srvr *p2p.Server) {
lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
if pm.lightSync {
// start sync handler
- if srvr != nil {
+ if srvr != nil { // srvr is nil during testing
pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
+ pm.odr.serverPool = pm.serverPool
+ pm.fetcher = newLightFetcher(pm)
}
go pm.syncer()
} else {
@@ -369,12 +353,17 @@ func (pm *ProtocolManager) handle(p *peer) error {
requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
return err
}
- pm.odr.RegisterPeer(p)
if pm.txrelay != nil {
pm.txrelay.addPeer(p)
}
- pm.fetcher.notify(p, nil)
+ p.lock.Lock()
+ head := p.headInfo
+ p.lock.Unlock()
+ if pm.fetcher != nil {
+ pm.fetcher.addPeer(p)
+ pm.fetcher.announce(p, head)
+ }
if p.poolEntry != nil {
pm.serverPool.registered(p.poolEntry)
@@ -460,7 +449,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "%v: %v", msg, err)
}
glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth)
- pm.fetcher.notify(p, &req)
+ if pm.fetcher != nil {
+ go pm.fetcher.announce(p, &req)
+ }
case GetBlockHeadersMsg:
glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id)
@@ -558,7 +549,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.GotReply(resp.ReqID, resp.BV)
- if pm.fetcher.requestedID(resp.ReqID) {
+ if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
} else {
err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
diff --git a/les/helper_test.go b/les/helper_test.go
index a5428e954..2df3faab2 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -25,6 +25,7 @@ import (
"math/big"
"sync"
"testing"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@@ -334,3 +335,13 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu
func (p *testPeer) close() {
p.app.Close()
}
+
+type testServerPool peer
+
+func (p *testServerPool) selectPeer(func(*peer) (bool, uint64)) *peer {
+ return (*peer)(p)
+}
+
+func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) {
+
+}
diff --git a/les/odr.go b/les/odr.go
index 444b1da2a..10ef928df 100644
--- a/les/odr.go
+++ b/les/odr.go
@@ -37,6 +37,11 @@ var (
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
+type odrPeerSelector interface {
+ selectPeer(func(*peer) (bool, uint64)) *peer
+ adjustResponseTime(*poolEntry, time.Duration, bool)
+}
+
type LesOdr struct {
light.OdrBackend
db ethdb.Database
@@ -44,7 +49,7 @@ type LesOdr struct {
removePeer peerDropFn
mlock, clock sync.Mutex
sentReqs map[uint64]*sentReq
- peers *odrPeerSet
+ serverPool odrPeerSelector
lastReqID uint64
}
@@ -52,7 +57,6 @@ func NewLesOdr(db ethdb.Database) *LesOdr {
return &LesOdr{
db: db,
stop: make(chan struct{}),
- peers: newOdrPeerSet(),
sentReqs: make(map[uint64]*sentReq),
}
}
@@ -77,16 +81,6 @@ type sentReq struct {
answered chan struct{} // closed and set to nil when any peer answers it
}
-// RegisterPeer registers a new LES peer to the ODR capable peer set
-func (self *LesOdr) RegisterPeer(p *peer) error {
- return self.peers.register(p)
-}
-
-// UnregisterPeer removes a peer from the ODR capable peer set
-func (self *LesOdr) UnregisterPeer(p *peer) {
- self.peers.unregister(p)
-}
-
const (
MsgBlockBodies = iota
MsgCode
@@ -142,29 +136,26 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha
select {
case <-delivered:
- servTime := uint64(mclock.Now() - stime)
- self.peers.updateTimeout(peer, false)
- self.peers.updateServTime(peer, servTime)
+ if self.serverPool != nil {
+ self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false)
+ }
return
case <-time.After(softRequestTimeout):
close(timeout)
- if self.peers.updateTimeout(peer, true) {
- self.removePeer(peer.id)
- }
case <-self.stop:
return
}
select {
case <-delivered:
- servTime := uint64(mclock.Now() - stime)
- self.peers.updateServTime(peer, servTime)
- return
case <-time.After(hardRequestTimeout):
- self.removePeer(peer.id)
+ go self.removePeer(peer.id)
case <-self.stop:
return
}
+ if self.serverPool != nil {
+ self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true)
+ }
}
// networkRequest sends a request to known peers until an answer is received
@@ -193,7 +184,13 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
exclude := make(map[*peer]struct{})
for {
- if peer := self.peers.bestPeer(lreq, exclude); peer == nil {
+ var p *peer
+ if self.serverPool != nil {
+ p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) {
+ return true, p.fcServer.CanSend(lreq.GetCost(p))
+ })
+ }
+ if p == nil {
select {
case <-ctx.Done():
return ctx.Err()
@@ -202,17 +199,17 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
case <-time.After(retryPeers):
}
} else {
- exclude[peer] = struct{}{}
+ exclude[p] = struct{}{}
delivered := make(chan struct{})
timeout := make(chan struct{})
req.lock.Lock()
- req.sentTo[peer] = delivered
+ req.sentTo[p] = delivered
req.lock.Unlock()
reqWg.Add(1)
- cost := lreq.GetCost(peer)
- peer.fcServer.SendRequest(reqID, cost)
- go self.requestPeer(req, peer, delivered, timeout, reqWg)
- lreq.Request(reqID, peer)
+ cost := lreq.GetCost(p)
+ p.fcServer.SendRequest(reqID, cost)
+ go self.requestPeer(req, p, delivered, timeout, reqWg)
+ lreq.Request(reqID, p)
select {
case <-ctx.Done():
diff --git a/les/odr_peerset.go b/les/odr_peerset.go
deleted file mode 100644
index e9b7eec7f..000000000
--- a/les/odr_peerset.go
+++ /dev/null
@@ -1,120 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package les
-
-import (
- "sync"
-)
-
-const dropTimeoutRatio = 20
-
-type odrPeerInfo struct {
- reqTimeSum, reqTimeCnt, reqCnt, timeoutCnt uint64
-}
-
-// odrPeerSet represents the collection of active peer participating in the block
-// download procedure.
-type odrPeerSet struct {
- peers map[*peer]*odrPeerInfo
- lock sync.RWMutex
-}
-
-// newPeerSet creates a new peer set top track the active download sources.
-func newOdrPeerSet() *odrPeerSet {
- return &odrPeerSet{
- peers: make(map[*peer]*odrPeerInfo),
- }
-}
-
-// Register injects a new peer into the working set, or returns an error if the
-// peer is already known.
-func (ps *odrPeerSet) register(p *peer) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- if _, ok := ps.peers[p]; ok {
- return errAlreadyRegistered
- }
- ps.peers[p] = &odrPeerInfo{}
- return nil
-}
-
-// Unregister removes a remote peer from the active set, disabling any further
-// actions to/from that particular entity.
-func (ps *odrPeerSet) unregister(p *peer) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- if _, ok := ps.peers[p]; !ok {
- return errNotRegistered
- }
- delete(ps.peers, p)
- return nil
-}
-
-func (ps *odrPeerSet) peerPriority(p *peer, info *odrPeerInfo, req LesOdrRequest) uint64 {
- tm := p.fcServer.CanSend(req.GetCost(p))
- if info.reqTimeCnt > 0 {
- tm += info.reqTimeSum / info.reqTimeCnt
- }
- return tm
-}
-
-func (ps *odrPeerSet) bestPeer(req LesOdrRequest, exclude map[*peer]struct{}) *peer {
- var best *peer
- var bpv uint64
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- for p, info := range ps.peers {
- if _, ok := exclude[p]; !ok {
- pv := ps.peerPriority(p, info, req)
- if best == nil || pv < bpv {
- best = p
- bpv = pv
- }
- }
- }
- return best
-}
-
-func (ps *odrPeerSet) updateTimeout(p *peer, timeout bool) (drop bool) {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- if info, ok := ps.peers[p]; ok {
- info.reqCnt++
- if timeout {
- // check ratio before increase to allow an extra timeout
- if info.timeoutCnt*dropTimeoutRatio >= info.reqCnt {
- return true
- }
- info.timeoutCnt++
- }
- }
- return false
-}
-
-func (ps *odrPeerSet) updateServTime(p *peer, servTime uint64) {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- if info, ok := ps.peers[p]; ok {
- info.reqTimeSum += servTime
- info.reqTimeCnt++
- }
-}
diff --git a/les/odr_test.go b/les/odr_test.go
index 27e3b283d..685435996 100644
--- a/les/odr_test.go
+++ b/les/odr_test.go
@@ -160,6 +160,8 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
pm, db, odr := newTestProtocolManagerMust(t, false, 4, testChainGen)
lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
+ pool := (*testServerPool)(lpeer)
+ odr.serverPool = pool
select {
case <-time.After(time.Millisecond * 100):
case err := <-err1:
@@ -188,13 +190,13 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
}
// temporarily remove peer to test odr fails
- odr.UnregisterPeer(lpeer)
+ odr.serverPool = nil
// expect retrievals to fail (except genesis block) without a les peer
test(expFail)
- odr.RegisterPeer(lpeer)
+ odr.serverPool = pool
// expect all retrievals to pass
test(5)
- odr.UnregisterPeer(lpeer)
+ odr.serverPool = nil
// still expect all retrievals to pass, now data should be cached locally
test(5)
}
diff --git a/les/peer.go b/les/peer.go
index 8eced439d..8ebbe3511 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -51,9 +51,8 @@ type peer struct {
id string
- firstHeadInfo, headInfo *announceData
- headInfoLen int
- lock sync.RWMutex
+ headInfo *announceData
+ lock sync.RWMutex
announceChn chan announceData
@@ -111,67 +110,6 @@ func (p *peer) headBlockInfo() blockInfo {
return blockInfo{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td}
}
-func (p *peer) addNotify(announce *announceData) bool {
- p.lock.Lock()
- defer p.lock.Unlock()
-
- if announce.Td.Cmp(p.headInfo.Td) < 1 {
- return false
- }
- if p.headInfoLen >= maxHeadInfoLen {
- //return false
- p.firstHeadInfo = p.firstHeadInfo.next
- p.headInfoLen--
- }
- if announce.haveHeaders == 0 {
- hh := p.headInfo.Number - announce.ReorgDepth
- if p.headInfo.haveHeaders < hh {
- hh = p.headInfo.haveHeaders
- }
- announce.haveHeaders = hh
- }
- p.headInfo.next = announce
- p.headInfo = announce
- p.headInfoLen++
- return true
-}
-
-func (p *peer) gotHeader(hash common.Hash, number uint64, td *big.Int) bool {
- h := p.firstHeadInfo
- ptr := 0
- for h != nil {
- if h.Hash == hash {
- if h.Number != number || h.Td.Cmp(td) != 0 {
- return false
- }
- h.headKnown = true
- h.haveHeaders = h.Number
- p.firstHeadInfo = h
- p.headInfoLen -= ptr
- last := h
- h = h.next
- // propagate haveHeaders through the chain
- for h != nil {
- hh := last.Number - h.ReorgDepth
- if last.haveHeaders < hh {
- hh = last.haveHeaders
- }
- if hh > h.haveHeaders {
- h.haveHeaders = hh
- } else {
- return true
- }
- last = h
- h = h.next
- }
- return true
- }
- h = h.next
- ptr++
- }
- return true
-}
-
// Td retrieves the current total difficulty of a peer.
func (p *peer) Td() *big.Int {
p.lock.RLock()
@@ -455,9 +393,7 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
p.fcCosts = MRC.decode()
}
- p.firstHeadInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum}
- p.headInfo = p.firstHeadInfo
- p.headInfoLen = 1
+ p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum}
return nil
}
diff --git a/les/request_test.go b/les/request_test.go
index a6fbb06ce..03b946771 100644
--- a/les/request_test.go
+++ b/les/request_test.go
@@ -71,6 +71,8 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen)
lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
+ pool := (*testServerPool)(lpeer)
+ odr.serverPool = pool
select {
case <-time.After(time.Millisecond * 100):
case err := <-err1:
@@ -100,11 +102,10 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
}
// temporarily remove peer to test odr fails
- odr.UnregisterPeer(lpeer)
+ odr.serverPool = nil
// expect retrievals to fail (except genesis block) without a les peer
test(0)
- odr.RegisterPeer(lpeer)
+ odr.serverPool = pool
// expect all retrievals to pass
test(5)
- odr.UnregisterPeer(lpeer)
}
diff --git a/les/serverpool.go b/les/serverpool.go
index 95968eda2..f5e880460 100644
--- a/les/serverpool.go
+++ b/les/serverpool.go
@@ -59,6 +59,9 @@ const (
targetKnownSelect = 3
// after dialTimeout, consider the server unavailable and adjust statistics
dialTimeout = time.Second * 30
+ // targetConnTime is the minimum expected connection duration before a server
+ // drops a client without any specific reason
+ targetConnTime = time.Minute * 10
// new entry selection weight calculation based on most recent discovery time:
// unity until discoverExpireStart, then exponential decay with discoverExpireConst
discoverExpireStart = time.Minute * 20
@@ -75,6 +78,17 @@ const (
// node address selection weight is dropped by a factor of exp(-addrFailDropLn) after
// each unsuccessful connection (restored after a successful one)
addrFailDropLn = math.Ln2
+ // responseScoreTC and delayScoreTC are exponential decay time constants for
+ // calculating selection chances from response times and block delay times
+ responseScoreTC = time.Millisecond * 100
+ delayScoreTC = time.Second * 5
+ timeoutPow = 10
+ // peerSelectMinWeight is added to calculated weights at request peer selection
+ // to give poorly performing peers a little chance of coming back
+ peerSelectMinWeight = 0.005
+ // initStatsWeight is used to initialize previously unknown peers with good
+ // statistics to give a chance to prove themselves
+ initStatsWeight = 1
)
// serverPool implements a pool for storing and selecting newly discovered and already
@@ -95,6 +109,7 @@ type serverPool struct {
entries map[discover.NodeID]*poolEntry
lock sync.Mutex
timeout, enableRetry chan *poolEntry
+ adjustStats chan poolStatAdjust
knownQueue, newQueue poolEntryQueue
knownSelect, newSelect *weightedRandomSelect
@@ -112,6 +127,7 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic
wg: wg,
entries: make(map[discover.NodeID]*poolEntry),
timeout: make(chan *poolEntry, 1),
+ adjustStats: make(chan poolStatAdjust, 100),
enableRetry: make(chan *poolEntry, 1),
knownSelect: newWeightedRandomSelect(),
newSelect: newWeightedRandomSelect(),
@@ -139,18 +155,19 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic
// Otherwise, the connection should be rejected.
// Note that whenever a connection has been accepted and a pool entry has been returned,
// disconnect should also always be called.
-func (pool *serverPool) connect(id discover.NodeID, ip net.IP, port uint16) *poolEntry {
+func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
pool.lock.Lock()
defer pool.lock.Unlock()
- entry := pool.entries[id]
+ entry := pool.entries[p.ID()]
if entry == nil {
return nil
}
- glog.V(logger.Debug).Infof("connecting to %v, state: %v", id.String(), entry.state)
+ glog.V(logger.Debug).Infof("connecting to %v, state: %v", p.id, entry.state)
if entry.state != psDialed {
return nil
}
pool.connWg.Add(1)
+ entry.peer = p
entry.state = psConnected
addr := &poolEntryAddress{
ip: ip,
@@ -172,42 +189,111 @@ func (pool *serverPool) registered(entry *poolEntry) {
defer pool.lock.Unlock()
entry.state = psRegistered
+ entry.regTime = mclock.Now()
if !entry.known {
pool.newQueue.remove(entry)
entry.known = true
}
pool.knownQueue.setLatest(entry)
entry.shortRetry = shortRetryCnt
- entry.connectStats.add(1)
}
// disconnect should be called when ending a connection. Service quality statistics
// can be updated optionally (not updated if no registration happened, in this case
// only connection statistics are updated, just like in case of timeout)
-func (pool *serverPool) disconnect(entry *poolEntry, quality float64, setQuality bool) {
+func (pool *serverPool) disconnect(entry *poolEntry) {
glog.V(logger.Debug).Infof("disconnected %v", entry.id.String())
pool.lock.Lock()
defer pool.lock.Unlock()
- if entry.state != psRegistered {
- setQuality = false
+ if entry.state == psRegistered {
+ connTime := mclock.Now() - entry.regTime
+ connAdjust := float64(connTime) / float64(targetConnTime)
+ if connAdjust > 1 {
+ connAdjust = 1
+ }
+ stopped := false
+ select {
+ case <-pool.quit:
+ stopped = true
+ default:
+ }
+ if stopped {
+ entry.connectStats.add(1, connAdjust)
+ } else {
+ entry.connectStats.add(connAdjust, 1)
+ }
}
+
entry.state = psNotConnected
if entry.knownSelected {
pool.knownSelected--
} else {
pool.newSelected--
}
- if setQuality {
- glog.V(logger.Debug).Infof("update quality %v %v", quality, entry.id.String())
- entry.qualityStats.add(quality)
- } else {
- glog.V(logger.Debug).Infof("do not update quality")
- }
pool.setRetryDial(entry)
pool.connWg.Done()
}
+const (
+ pseBlockDelay = iota
+ pseResponseTime
+ pseResponseTimeout
+)
+
+// poolStatAdjust records are sent to adjust peer block delay/response time statistics
+type poolStatAdjust struct {
+ adjustType int
+ entry *poolEntry
+ time time.Duration
+}
+
+// adjustBlockDelay adjusts the block announce delay statistics of a node
+func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
+ pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
+}
+
+// adjustResponseTime adjusts the request response time statistics of a node
+func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
+ if timeout {
+ pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
+ } else {
+ pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time}
+ }
+}
+
+type selectPeerItem struct {
+ peer *peer
+ weight int64
+}
+
+func (sp selectPeerItem) Weight() int64 {
+ return sp.weight
+}
+
+// selectPeer selects a suitable peer for a request
+func (pool *serverPool) selectPeer(canSend func(*peer) (bool, uint64)) *peer {
+ pool.lock.Lock()
+ defer pool.lock.Unlock()
+
+ sel := newWeightedRandomSelect()
+ for _, entry := range pool.entries {
+ if entry.state == psRegistered {
+ p := entry.peer
+ ok, cost := canSend(p)
+ if ok {
+ w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(entry.responseStats.recentAvg()+float64(cost))/float64(responseScoreTC))*math.Pow((1-entry.timeoutStats.recentAvg()), timeoutPow)))
+ sel.update(selectPeerItem{peer: p, weight: w})
+ }
+ }
+ }
+ choice := sel.choose()
+ if choice == nil {
+ return nil
+ }
+ return choice.(selectPeerItem).peer
+}
+
// eventLoop handles pool events and mutex locking for all internal functions
func (pool *serverPool) eventLoop() {
lookupCnt := 0
@@ -230,6 +316,19 @@ func (pool *serverPool) eventLoop() {
}
pool.lock.Unlock()
+ case adj := <-pool.adjustStats:
+ pool.lock.Lock()
+ switch adj.adjustType {
+ case pseBlockDelay:
+ adj.entry.delayStats.add(float64(adj.time), 1)
+ case pseResponseTime:
+ adj.entry.responseStats.add(float64(adj.time), 1)
+ adj.entry.timeoutStats.add(0, 1)
+ case pseResponseTimeout:
+ adj.entry.timeoutStats.add(1, 1)
+ }
+ pool.lock.Unlock()
+
case node := <-pool.discNodes:
pool.lock.Lock()
now := mclock.Now()
@@ -244,6 +343,11 @@ func (pool *serverPool) eventLoop() {
shortRetry: shortRetryCnt,
}
pool.entries[id] = entry
+ // initialize previously unknown peers with good statistics to give a chance to prove themselves
+ entry.connectStats.add(1, initStatsWeight)
+ entry.delayStats.add(0, initStatsWeight)
+ entry.responseStats.add(0, initStatsWeight)
+ entry.timeoutStats.add(0, initStatsWeight)
}
entry.lastDiscovered = now
addr := &poolEntryAddress{
@@ -298,9 +402,8 @@ func (pool *serverPool) loadNodes() {
glog.V(logger.Debug).Infof("node list decode error: %v", err)
return
}
- glog.V(logger.Debug).Infof("loaded node list")
for _, e := range list {
- glog.V(logger.Debug).Infof(" adding node %v fails: %v connStats sum: %v cnt: %v qualityStats sum: %v cnt: %v", e.id.String()+"@"+e.lastConnected.strKey(), e.lastConnected.fails, e.connectStats.sum, e.connectStats.cnt, e.qualityStats.sum, e.qualityStats.cnt)
+ glog.V(logger.Debug).Infof("loaded server stats %016x fails: %v connStats: %v / %v delayStats: %v / %v responseStats: %v / %v timeoutStats: %v / %v", e.id[0:8], e.lastConnected.fails, e.connectStats.avg, e.connectStats.weight, time.Duration(e.delayStats.avg), e.delayStats.weight, time.Duration(e.responseStats.avg), e.responseStats.weight, e.timeoutStats.avg, e.timeoutStats.weight)
pool.entries[e.id] = e
pool.knownQueue.setLatest(e)
pool.knownSelect.update((*knownEntry)(e))
@@ -433,7 +536,7 @@ func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
} else {
pool.newSelected--
}
- entry.connectStats.add(0)
+ entry.connectStats.add(0, 1)
entry.dialed.fails++
pool.setRetryDial(entry)
}
@@ -447,33 +550,36 @@ const (
// poolEntry represents a server node and stores its current state and statistics.
type poolEntry struct {
+ peer *peer
id discover.NodeID
addr map[string]*poolEntryAddress
lastConnected, dialed *poolEntryAddress
addrSelect weightedRandomSelect
- lastDiscovered mclock.AbsTime
- known, knownSelected bool
- connectStats, qualityStats poolStats
- state int
- queueIdx int
- removed bool
+ lastDiscovered mclock.AbsTime
+ known, knownSelected bool
+ connectStats, delayStats poolStats
+ responseStats, timeoutStats poolStats
+ state int
+ regTime mclock.AbsTime
+ queueIdx int
+ removed bool
delayedRetry bool
shortRetry int
}
func (e *poolEntry) EncodeRLP(w io.Writer) error {
- return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.qualityStats})
+ return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats})
}
func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
var entry struct {
- ID discover.NodeID
- IP net.IP
- Port uint16
- Fails uint
- CStat, QStat poolStats
+ ID discover.NodeID
+ IP net.IP
+ Port uint16
+ Fails uint
+ CStat, DStat, RStat, TStat poolStats
}
if err := s.Decode(&entry); err != nil {
return err
@@ -486,7 +592,9 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
e.addrSelect.update(addr)
e.lastConnected = addr
e.connectStats = entry.CStat
- e.qualityStats = entry.QStat
+ e.delayStats = entry.DStat
+ e.responseStats = entry.RStat
+ e.timeoutStats = entry.TStat
e.shortRetry = shortRetryCnt
e.known = true
return nil
@@ -516,7 +624,7 @@ func (e *knownEntry) Weight() int64 {
if e.state != psNotConnected || !e.known || e.delayedRetry {
return 0
}
- return int64(1000000000 * e.connectStats.recentAvg() * (e.qualityStats.recentAvg() + 0.001) * math.Exp(-float64(e.lastConnected.fails)*failDropLn))
+ return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow((1-e.timeoutStats.recentAvg()), timeoutPow))
}
// poolEntryAddress is a separate object because currently it is necessary to remember
@@ -544,18 +652,17 @@ func (a *poolEntryAddress) strKey() string {
// pstatRecentAdjust with each update and also returned exponentially to the
// average with the time constant pstatReturnToMeanTC
type poolStats struct {
- sum, avg, recent float64
- cnt uint
- lastRecalc mclock.AbsTime
+ sum, weight, avg, recent float64
+ lastRecalc mclock.AbsTime
}
// init initializes stats with a long term sum/update count pair retrieved from the database
-func (s *poolStats) init(sum float64, cnt uint) {
+func (s *poolStats) init(sum, weight float64) {
s.sum = sum
- s.cnt = cnt
+ s.weight = weight
var avg float64
- if cnt > 0 {
- avg = s.sum / float64(cnt)
+ if weight > 0 {
+ avg = s.sum / weight
}
s.avg = avg
s.recent = avg
@@ -566,16 +673,22 @@ func (s *poolStats) init(sum float64, cnt uint) {
func (s *poolStats) recalc() {
now := mclock.Now()
s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC))
- if s.cnt > 0 {
- s.avg = s.sum / float64(s.cnt)
+ if s.sum == 0 {
+ s.avg = 0
+ } else {
+ if s.sum > s.weight*1e30 {
+ s.avg = 1e30
+ } else {
+ s.avg = s.sum / s.weight
+ }
}
s.lastRecalc = now
}
// add updates the stats with a new value
-func (s *poolStats) add(val float64) {
- s.cnt++
- s.sum += val
+func (s *poolStats) add(value, weight float64) {
+ s.weight += weight
+ s.sum += value * weight
s.recalc()
}
@@ -586,18 +699,17 @@ func (s *poolStats) recentAvg() float64 {
}
func (s *poolStats) EncodeRLP(w io.Writer) error {
- return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), s.cnt})
+ return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)})
}
func (s *poolStats) DecodeRLP(st *rlp.Stream) error {
var stats struct {
- SumUint uint64
- Cnt uint
+ SumUint, WeightUint uint64
}
if err := st.Decode(&stats); err != nil {
return err
}
- s.init(math.Float64frombits(stats.SumUint), stats.Cnt)
+ s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint))
return nil
}