aboutsummaryrefslogblamecommitdiffstats
path: root/les/api.go
blob: 3a8d49ca5e0f1b588e9471d70fa62c829d59df02 (plain) (tree)




















                                                                                  
             




                                                        
                                                       










































































                                                                                                                             
                                                     














                                                                                    
                                                     



























                                                                                 

                                                           



















                                                                                                            
                                                                                                                                                   
                                   





                                                                             














                                                                                  
                                                                                                                                 






                                                                 
                                                                                           









                                            

                                                                                                                   














                                                                                                     
                                                                                                                                   







                                            
                                                                      









































































                                                                                                                                                        
                                                                                                                          

                                                            
                                                                                      






















































                                                                                                
                                                                      

















                                                                                                     


                                                                                                              













































































                                                                                                                                       
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package les

import (
    "context"
    "errors"
    "fmt"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/common/hexutil"
    "github.com/ethereum/go-ethereum/common/mclock"
    "github.com/ethereum/go-ethereum/les/csvlogger"
    "github.com/ethereum/go-ethereum/p2p/enode"
    "github.com/ethereum/go-ethereum/rpc"
)

var (
    ErrMinCap               = errors.New("capacity too small")
    ErrTotalCap             = errors.New("total capacity exceeded")
    ErrUnknownBenchmarkType = errors.New("unknown benchmark type")

    dropCapacityDelay = time.Second // delay applied to decreasing capacity changes
)

// PrivateLightServerAPI provides an API to access the LES light server.
// It offers only methods that operate on public data that is freely available to anyone.
type PrivateLightServerAPI struct {
    server *LesServer
}

// NewPrivateLightServerAPI creates a new LES light server API.
func NewPrivateLightServerAPI(server *LesServer) *PrivateLightServerAPI {
    return &PrivateLightServerAPI{
        server: server,
    }
}

// TotalCapacity queries total available capacity for all clients
func (api *PrivateLightServerAPI) TotalCapacity() hexutil.Uint64 {
    return hexutil.Uint64(api.server.priorityClientPool.totalCapacity())
}

// SubscribeTotalCapacity subscribes to changed total capacity events.
// If onlyUnderrun is true then notification is sent only if the total capacity
// drops under the total capacity of connected priority clients.
//
// Note: actually applying decreasing total capacity values is delayed while the
// notification is sent instantly. This allows lowering the capacity of a priority client
// or choosing which one to drop before the system drops some of them automatically.
func (api *PrivateLightServerAPI) SubscribeTotalCapacity(ctx context.Context, onlyUnderrun bool) (*rpc.Subscription, error) {
    notifier, supported := rpc.NotifierFromContext(ctx)
    if !supported {
        return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
    }
    rpcSub := notifier.CreateSubscription()
    api.server.priorityClientPool.subscribeTotalCapacity(&tcSubscription{notifier, rpcSub, onlyUnderrun})
    return rpcSub, nil
}

type (
    // tcSubscription represents a total capacity subscription
    tcSubscription struct {
        notifier     *rpc.Notifier
        rpcSub       *rpc.Subscription
        onlyUnderrun bool
    }
    tcSubs map[*tcSubscription]struct{}
)

// send sends a changed total capacity event to the subscribers
func (s tcSubs) send(tc uint64, underrun bool) {
    for sub := range s {
        select {
        case <-sub.rpcSub.Err():
            delete(s, sub)
        case <-sub.notifier.Closed():
            delete(s, sub)
        default:
            if underrun || !sub.onlyUnderrun {
                sub.notifier.Notify(sub.rpcSub.ID, tc)
            }
        }
    }
}

// MinimumCapacity queries minimum assignable capacity for a single client
func (api *PrivateLightServerAPI) MinimumCapacity() hexutil.Uint64 {
    return hexutil.Uint64(api.server.minCapacity)
}

// FreeClientCapacity queries the capacity provided for free clients
func (api *PrivateLightServerAPI) FreeClientCapacity() hexutil.Uint64 {
    return hexutil.Uint64(api.server.freeClientCap)
}

// SetClientCapacity sets the priority capacity assigned to a given client.
// If the assigned capacity is bigger than zero then connection is always
// guaranteed. The sum of capacity assigned to priority clients can not exceed
// the total available capacity.
//
// Note: assigned capacity can be changed while the client is connected with
// immediate effect.
func (api *PrivateLightServerAPI) SetClientCapacity(id enode.ID, cap uint64) error {
    if cap != 0 && cap < api.server.minCapacity {
        return ErrMinCap
    }
    return api.server.priorityClientPool.setClientCapacity(id, cap)
}

// GetClientCapacity returns the capacity assigned to a given client
func (api *PrivateLightServerAPI) GetClientCapacity(id enode.ID) hexutil.Uint64 {
    api.server.priorityClientPool.lock.Lock()
    defer api.server.priorityClientPool.lock.Unlock()

    return hexutil.Uint64(api.server.priorityClientPool.clients[id].cap)
}

// clientPool is implemented by both the free and priority client pools
type clientPool interface {
    peerSetNotify
    setLimits(count int, totalCap uint64)
}

// priorityClientPool stores information about prioritized clients
type priorityClientPool struct {
    lock                             sync.Mutex
    child                            clientPool
    ps                               *peerSet
    clients                          map[enode.ID]priorityClientInfo
    totalCap, totalCapAnnounced      uint64
    totalConnectedCap, freeClientCap uint64
    maxPeers, priorityCount          int
    logger                           *csvlogger.Logger
    logTotalPriConn                  *csvlogger.Channel

    subs            tcSubs
    updateSchedule  []scheduledUpdate
    scheduleCounter uint64
}

// scheduledUpdate represents a delayed total capacity update
type scheduledUpdate struct {
    time         mclock.AbsTime
    totalCap, id uint64
}

// priorityClientInfo entries exist for all prioritized clients and currently connected non-priority clients
type priorityClientInfo struct {
    cap       uint64 // zero for non-priority clients
    connected bool
    peer      *peer
}

// newPriorityClientPool creates a new priority client pool
func newPriorityClientPool(freeClientCap uint64, ps *peerSet, child clientPool, metricsLogger, eventLogger *csvlogger.Logger) *priorityClientPool {
    return &priorityClientPool{
        clients:         make(map[enode.ID]priorityClientInfo),
        freeClientCap:   freeClientCap,
        ps:              ps,
        child:           child,
        logger:          eventLogger,
        logTotalPriConn: metricsLogger.NewChannel("totalPriConn", 0),
    }
}

// registerPeer is called when a new client is connected. If the client has no
// priority assigned then it is passed to the child pool which may either keep it
// or disconnect it.
//
// Note: priorityClientPool also stores a record about free clients while they are
// connected in order to be able to assign priority to them later.
func (v *priorityClientPool) registerPeer(p *peer) {
    v.lock.Lock()
    defer v.lock.Unlock()

    id := p.ID()
    c := v.clients[id]
    v.logger.Event(fmt.Sprintf("priorityClientPool: registerPeer  cap=%d  connected=%v, %x", c.cap, c.connected, id.Bytes()))
    if c.connected {
        return
    }
    if c.cap == 0 && v.child != nil {
        v.child.registerPeer(p)
    }
    if c.cap != 0 && v.totalConnectedCap+c.cap > v.totalCap {
        v.logger.Event(fmt.Sprintf("priorityClientPool: rejected, %x", id.Bytes()))
        go v.ps.Unregister(p.id)
        return
    }

    c.connected = true
    c.peer = p
    v.clients[id] = c
    if c.cap != 0 {
        v.priorityCount++
        v.totalConnectedCap += c.cap
        v.logger.Event(fmt.Sprintf("priorityClientPool: accepted with %d capacity, %x", c.cap, id.Bytes()))
        v.logTotalPriConn.Update(float64(v.totalConnectedCap))
        if v.child != nil {
            v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
        }
        p.updateCapacity(c.cap)
    }
}

// unregisterPeer is called when a client is disconnected. If the client has no
// priority assigned then it is also removed from the child pool.
func (v *priorityClientPool) unregisterPeer(p *peer) {
    v.lock.Lock()
    defer v.lock.Unlock()

    id := p.ID()
    c := v.clients[id]
    v.logger.Event(fmt.Sprintf("priorityClientPool: unregisterPeer  cap=%d  connected=%v, %x", c.cap, c.connected, id.Bytes()))
    if !c.connected {
        return
    }
    if c.cap != 0 {
        c.connected = false
        v.clients[id] = c
        v.priorityCount--
        v.totalConnectedCap -= c.cap
        v.logTotalPriConn.Update(float64(v.totalConnectedCap))
        if v.child != nil {
            v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
        }
    } else {
        if v.child != nil {
            v.child.unregisterPeer(p)
        }
        delete(v.clients, id)
    }
}

// setLimits updates the allowed peer count and total capacity of the priority
// client pool. Since the free client pool is a child of the priority pool the
// remaining peer count and capacity is assigned to the free pool by calling its
// own setLimits function.
//
// Note: a decreasing change of the total capacity is applied with a delay.
func (v *priorityClientPool) setLimits(count int, totalCap uint64) {
    v.lock.Lock()
    defer v.lock.Unlock()

    v.totalCapAnnounced = totalCap
    if totalCap > v.totalCap {
        v.setLimitsNow(count, totalCap)
        v.subs.send(totalCap, false)
        return
    }
    v.setLimitsNow(count, v.totalCap)
    if totalCap < v.totalCap {
        v.subs.send(totalCap, totalCap < v.totalConnectedCap)
        for i, s := range v.updateSchedule {
            if totalCap >= s.totalCap {
                s.totalCap = totalCap
                v.updateSchedule = v.updateSchedule[:i+1]
                return
            }
        }
        v.updateSchedule = append(v.updateSchedule, scheduledUpdate{time: mclock.Now() + mclock.AbsTime(dropCapacityDelay), totalCap: totalCap})
        if len(v.updateSchedule) == 1 {
            v.scheduleCounter++
            id := v.scheduleCounter
            v.updateSchedule[0].id = id
            time.AfterFunc(dropCapacityDelay, func() { v.checkUpdate(id) })
        }
    } else {
        v.updateSchedule = nil
    }
}

// checkUpdate performs the next scheduled update if possible and schedules
// the one after that
func (v *priorityClientPool) checkUpdate(id uint64) {
    v.lock.Lock()
    defer v.lock.Unlock()

    if len(v.updateSchedule) == 0 || v.updateSchedule[0].id != id {
        return
    }
    v.setLimitsNow(v.maxPeers, v.updateSchedule[0].totalCap)
    v.updateSchedule = v.updateSchedule[1:]
    if len(v.updateSchedule) != 0 {
        v.scheduleCounter++
        id := v.scheduleCounter
        v.updateSchedule[0].id = id
        dt := time.Duration(v.updateSchedule[0].time - mclock.Now())
        time.AfterFunc(dt, func() { v.checkUpdate(id) })
    }
}

// setLimits updates the allowed peer count and total capacity immediately
func (v *priorityClientPool) setLimitsNow(count int, totalCap uint64) {
    if v.priorityCount > count || v.totalConnectedCap > totalCap {
        for id, c := range v.clients {
            if c.connected {
                v.logger.Event(fmt.Sprintf("priorityClientPool: setLimitsNow kicked out, %x", id.Bytes()))
                c.connected = false
                v.totalConnectedCap -= c.cap
                v.logTotalPriConn.Update(float64(v.totalConnectedCap))
                v.priorityCount--
                v.clients[id] = c
                go v.ps.Unregister(c.peer.id)
                if v.priorityCount <= count && v.totalConnectedCap <= totalCap {
                    break
                }
            }
        }
    }
    v.maxPeers = count
    v.totalCap = totalCap
    if v.child != nil {
        v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
    }
}

// totalCapacity queries total available capacity for all clients
func (v *priorityClientPool) totalCapacity() uint64 {
    v.lock.Lock()
    defer v.lock.Unlock()

    return v.totalCapAnnounced
}

// subscribeTotalCapacity subscribes to changed total capacity events
func (v *priorityClientPool) subscribeTotalCapacity(sub *tcSubscription) {
    v.lock.Lock()
    defer v.lock.Unlock()

    v.subs[sub] = struct{}{}
}

// setClientCapacity sets the priority capacity assigned to a given client
func (v *priorityClientPool) setClientCapacity(id enode.ID, cap uint64) error {
    v.lock.Lock()
    defer v.lock.Unlock()

    c := v.clients[id]
    if c.cap == cap {
        return nil
    }
    if c.connected {
        if v.totalConnectedCap+cap > v.totalCap+c.cap {
            return ErrTotalCap
        }
        if c.cap == 0 {
            if v.child != nil {
                v.child.unregisterPeer(c.peer)
            }
            v.priorityCount++
        }
        if cap == 0 {
            v.priorityCount--
        }
        v.totalConnectedCap += cap - c.cap
        v.logTotalPriConn.Update(float64(v.totalConnectedCap))
        if v.child != nil {
            v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
        }
        if cap == 0 {
            if v.child != nil {
                v.child.registerPeer(c.peer)
            }
            c.peer.updateCapacity(v.freeClientCap)
        } else {
            c.peer.updateCapacity(cap)
        }
    }
    if cap != 0 || c.connected {
        c.cap = cap
        v.clients[id] = c
    } else {
        delete(v.clients, id)
    }
    if c.connected {
        v.logger.Event(fmt.Sprintf("priorityClientPool: changed capacity to %d, %x", cap, id.Bytes()))
    }
    return nil
}

// Benchmark runs a request performance benchmark with a given set of measurement setups
// in multiple passes specified by passCount. The measurement time for each setup in each
// pass is specified in milliseconds by length.
//
// Note: measurement time is adjusted for each pass depending on the previous ones.
// Therefore a controlled total measurement time is achievable in multiple passes.
func (api *PrivateLightServerAPI) Benchmark(setups []map[string]interface{}, passCount, length int) ([]map[string]interface{}, error) {
    benchmarks := make([]requestBenchmark, len(setups))
    for i, setup := range setups {
        if t, ok := setup["type"].(string); ok {
            getInt := func(field string, def int) int {
                if value, ok := setup[field].(float64); ok {
                    return int(value)
                }
                return def
            }
            getBool := func(field string, def bool) bool {
                if value, ok := setup[field].(bool); ok {
                    return value
                }
                return def
            }
            switch t {
            case "header":
                benchmarks[i] = &benchmarkBlockHeaders{
                    amount:  getInt("amount", 1),
                    skip:    getInt("skip", 1),
                    byHash:  getBool("byHash", false),
                    reverse: getBool("reverse", false),
                }
            case "body":
                benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: false}
            case "receipts":
                benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: true}
            case "proof":
                benchmarks[i] = &benchmarkProofsOrCode{code: false}
            case "code":
                benchmarks[i] = &benchmarkProofsOrCode{code: true}
            case "cht":
                benchmarks[i] = &benchmarkHelperTrie{
                    bloom:    false,
                    reqCount: getInt("amount", 1),
                }
            case "bloom":
                benchmarks[i] = &benchmarkHelperTrie{
                    bloom:    true,
                    reqCount: getInt("amount", 1),
                }
            case "txSend":
                benchmarks[i] = &benchmarkTxSend{}
            case "txStatus":
                benchmarks[i] = &benchmarkTxStatus{}
            default:
                return nil, ErrUnknownBenchmarkType
            }
        } else {
            return nil, ErrUnknownBenchmarkType
        }
    }
    rs := api.server.protocolManager.runBenchmark(benchmarks, passCount, time.Millisecond*time.Duration(length))
    result := make([]map[string]interface{}, len(setups))
    for i, r := range rs {
        res := make(map[string]interface{})
        if r.err == nil {
            res["totalCount"] = r.totalCount
            res["avgTime"] = r.avgTime
            res["maxInSize"] = r.maxInSize
            res["maxOutSize"] = r.maxOutSize
        } else {
            res["error"] = r.err.Error()
        }
        result[i] = res
    }
    return result, nil
}