aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/mclock/mclock.go31
-rw-r--r--common/mclock/simclock.go129
-rwxr-xr-xcommon/prque/prque.go57
-rwxr-xr-xcommon/prque/sstack.go106
-rw-r--r--les/freeclient.go278
-rw-r--r--les/freeclient_test.go139
-rw-r--r--les/handler.go22
7 files changed, 761 insertions, 1 deletions
diff --git a/common/mclock/mclock.go b/common/mclock/mclock.go
index 02608d17b..dcac59c6c 100644
--- a/common/mclock/mclock.go
+++ b/common/mclock/mclock.go
@@ -30,3 +30,34 @@ type AbsTime time.Duration
func Now() AbsTime {
return AbsTime(monotime.Now())
}
+
+// Add returns t + d.
+func (t AbsTime) Add(d time.Duration) AbsTime {
+ return t + AbsTime(d)
+}
+
+// Clock interface makes it possible to replace the monotonic system clock with
+// a simulated clock.
+type Clock interface {
+ Now() AbsTime
+ Sleep(time.Duration)
+ After(time.Duration) <-chan time.Time
+}
+
+// System implements Clock using the system clock.
+type System struct{}
+
+// Now implements Clock.
+func (System) Now() AbsTime {
+ return AbsTime(monotime.Now())
+}
+
+// Sleep implements Clock.
+func (System) Sleep(d time.Duration) {
+ time.Sleep(d)
+}
+
+// After implements Clock.
+func (System) After(d time.Duration) <-chan time.Time {
+ return time.After(d)
+}
diff --git a/common/mclock/simclock.go b/common/mclock/simclock.go
new file mode 100644
index 000000000..e014f5615
--- /dev/null
+++ b/common/mclock/simclock.go
@@ -0,0 +1,129 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package mclock
+
+import (
+ "sync"
+ "time"
+)
+
+// Simulated implements a virtual Clock for reproducible time-sensitive tests. It
+// simulates a scheduler on a virtual timescale where actual processing takes zero time.
+//
+// The virtual clock doesn't advance on its own, call Run to advance it and execute timers.
+// Since there is no way to influence the Go scheduler, testing timeout behaviour involving
+// goroutines needs special care. A good way to test such timeouts is as follows: First
+// perform the action that is supposed to time out. Ensure that the timer you want to test
+// is created. Then run the clock until after the timeout. Finally observe the effect of
+// the timeout using a channel or semaphore.
+type Simulated struct {
+ now AbsTime
+ scheduled []event
+ mu sync.RWMutex
+ cond *sync.Cond
+}
+
+type event struct {
+ do func()
+ at AbsTime
+}
+
+// Run moves the clock by the given duration, executing all timers before that duration.
+func (s *Simulated) Run(d time.Duration) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.init()
+
+ end := s.now + AbsTime(d)
+ for len(s.scheduled) > 0 {
+ ev := s.scheduled[0]
+ if ev.at > end {
+ break
+ }
+ s.now = ev.at
+ ev.do()
+ s.scheduled = s.scheduled[1:]
+ }
+ s.now = end
+}
+
+func (s *Simulated) ActiveTimers() int {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ return len(s.scheduled)
+}
+
+func (s *Simulated) WaitForTimers(n int) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.init()
+
+ for len(s.scheduled) < n {
+ s.cond.Wait()
+ }
+}
+
+// Now implements Clock.
+func (s *Simulated) Now() AbsTime {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ return s.now
+}
+
+// Sleep implements Clock.
+func (s *Simulated) Sleep(d time.Duration) {
+ <-s.After(d)
+}
+
+// After implements Clock.
+func (s *Simulated) After(d time.Duration) <-chan time.Time {
+ after := make(chan time.Time, 1)
+ s.insert(d, func() {
+ after <- (time.Time{}).Add(time.Duration(s.now))
+ })
+ return after
+}
+
+func (s *Simulated) insert(d time.Duration, do func()) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.init()
+
+ at := s.now + AbsTime(d)
+ l, h := 0, len(s.scheduled)
+ ll := h
+ for l != h {
+ m := (l + h) / 2
+ if at < s.scheduled[m].at {
+ h = m
+ } else {
+ l = m + 1
+ }
+ }
+ s.scheduled = append(s.scheduled, event{})
+ copy(s.scheduled[l+1:], s.scheduled[l:ll])
+ s.scheduled[l] = event{do: do, at: at}
+ s.cond.Broadcast()
+}
+
+func (s *Simulated) init() {
+ if s.cond == nil {
+ s.cond = sync.NewCond(&s.mu)
+ }
+}
diff --git a/common/prque/prque.go b/common/prque/prque.go
new file mode 100755
index 000000000..9fd31a2e5
--- /dev/null
+++ b/common/prque/prque.go
@@ -0,0 +1,57 @@
+// This is a duplicated and slightly modified version of "gopkg.in/karalabe/cookiejar.v2/collections/prque".
+
+package prque
+
+import (
+ "container/heap"
+)
+
+// Priority queue data structure.
+type Prque struct {
+ cont *sstack
+}
+
+// Creates a new priority queue.
+func New(setIndex setIndexCallback) *Prque {
+ return &Prque{newSstack(setIndex)}
+}
+
+// Pushes a value with a given priority into the queue, expanding if necessary.
+func (p *Prque) Push(data interface{}, priority int64) {
+ heap.Push(p.cont, &item{data, priority})
+}
+
+// Pops the value with the greates priority off the stack and returns it.
+// Currently no shrinking is done.
+func (p *Prque) Pop() (interface{}, int64) {
+ item := heap.Pop(p.cont).(*item)
+ return item.value, item.priority
+}
+
+// Pops only the item from the queue, dropping the associated priority value.
+func (p *Prque) PopItem() interface{} {
+ return heap.Pop(p.cont).(*item).value
+}
+
+// Remove removes the element with the given index.
+func (p *Prque) Remove(i int) interface{} {
+ if i < 0 {
+ return nil
+ }
+ return heap.Remove(p.cont, i)
+}
+
+// Checks whether the priority queue is empty.
+func (p *Prque) Empty() bool {
+ return p.cont.Len() == 0
+}
+
+// Returns the number of element in the priority queue.
+func (p *Prque) Size() int {
+ return p.cont.Len()
+}
+
+// Clears the contents of the priority queue.
+func (p *Prque) Reset() {
+ *p = *New(p.cont.setIndex)
+}
diff --git a/common/prque/sstack.go b/common/prque/sstack.go
new file mode 100755
index 000000000..4875dae99
--- /dev/null
+++ b/common/prque/sstack.go
@@ -0,0 +1,106 @@
+// This is a duplicated and slightly modified version of "gopkg.in/karalabe/cookiejar.v2/collections/prque".
+
+package prque
+
+// The size of a block of data
+const blockSize = 4096
+
+// A prioritized item in the sorted stack.
+//
+// Note: priorities can "wrap around" the int64 range, a comes before b if (a.priority - b.priority) > 0.
+// The difference between the lowest and highest priorities in the queue at any point should be less than 2^63.
+type item struct {
+ value interface{}
+ priority int64
+}
+
+// setIndexCallback is called when the element is moved to a new index.
+// Providing setIndexCallback is optional, it is needed only if the application needs
+// to delete elements other than the top one.
+type setIndexCallback func(a interface{}, i int)
+
+// Internal sortable stack data structure. Implements the Push and Pop ops for
+// the stack (heap) functionality and the Len, Less and Swap methods for the
+// sortability requirements of the heaps.
+type sstack struct {
+ setIndex setIndexCallback
+ size int
+ capacity int
+ offset int
+
+ blocks [][]*item
+ active []*item
+}
+
+// Creates a new, empty stack.
+func newSstack(setIndex setIndexCallback) *sstack {
+ result := new(sstack)
+ result.setIndex = setIndex
+ result.active = make([]*item, blockSize)
+ result.blocks = [][]*item{result.active}
+ result.capacity = blockSize
+ return result
+}
+
+// Pushes a value onto the stack, expanding it if necessary. Required by
+// heap.Interface.
+func (s *sstack) Push(data interface{}) {
+ if s.size == s.capacity {
+ s.active = make([]*item, blockSize)
+ s.blocks = append(s.blocks, s.active)
+ s.capacity += blockSize
+ s.offset = 0
+ } else if s.offset == blockSize {
+ s.active = s.blocks[s.size/blockSize]
+ s.offset = 0
+ }
+ if s.setIndex != nil {
+ s.setIndex(data.(*item).value, s.size)
+ }
+ s.active[s.offset] = data.(*item)
+ s.offset++
+ s.size++
+}
+
+// Pops a value off the stack and returns it. Currently no shrinking is done.
+// Required by heap.Interface.
+func (s *sstack) Pop() (res interface{}) {
+ s.size--
+ s.offset--
+ if s.offset < 0 {
+ s.offset = blockSize - 1
+ s.active = s.blocks[s.size/blockSize]
+ }
+ res, s.active[s.offset] = s.active[s.offset], nil
+ if s.setIndex != nil {
+ s.setIndex(res.(*item).value, -1)
+ }
+ return
+}
+
+// Returns the length of the stack. Required by sort.Interface.
+func (s *sstack) Len() int {
+ return s.size
+}
+
+// Compares the priority of two elements of the stack (higher is first).
+// Required by sort.Interface.
+func (s *sstack) Less(i, j int) bool {
+ return (s.blocks[i/blockSize][i%blockSize].priority - s.blocks[j/blockSize][j%blockSize].priority) > 0
+}
+
+// Swaps two elements in the stack. Required by sort.Interface.
+func (s *sstack) Swap(i, j int) {
+ ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize
+ a, b := s.blocks[jb][jo], s.blocks[ib][io]
+ if s.setIndex != nil {
+ s.setIndex(a.value, i)
+ s.setIndex(b.value, j)
+ }
+ s.blocks[ib][io], s.blocks[jb][jo] = a, b
+}
+
+// Resets the stack, effectively clearing its contents.
+func (s *sstack) Reset() {
+ *s = *newSstack(s.setIndex)
+}
diff --git a/les/freeclient.go b/les/freeclient.go
new file mode 100644
index 000000000..5ee607be8
--- /dev/null
+++ b/les/freeclient.go
@@ -0,0 +1,278 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package les implements the Light Ethereum Subprotocol.
+package les
+
+import (
+ "io"
+ "math"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/common/prque"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+// freeClientPool implements a client database that limits the connection time
+// of each client and manages accepting/rejecting incoming connections and even
+// kicking out some connected clients. The pool calculates recent usage time
+// for each known client (a value that increases linearly when the client is
+// connected and decreases exponentially when not connected). Clients with lower
+// recent usage are preferred, unknown nodes have the highest priority. Already
+// connected nodes receive a small bias in their favor in order to avoid accepting
+// and instantly kicking out clients.
+//
+// Note: the pool can use any string for client identification. Using signature
+// keys for that purpose would not make sense when being known has a negative
+// value for the client. Currently the LES protocol manager uses IP addresses
+// (without port address) to identify clients.
+type freeClientPool struct {
+ db ethdb.Database
+ lock sync.Mutex
+ clock mclock.Clock
+ closed bool
+
+ connectedLimit, totalLimit int
+
+ addressMap map[string]*freeClientPoolEntry
+ connPool, disconnPool *prque.Prque
+ startupTime mclock.AbsTime
+ logOffsetAtStartup int64
+}
+
+const (
+ recentUsageExpTC = time.Hour // time constant of the exponential weighting window for "recent" server usage
+ fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format
+ connectedBias = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
+)
+
+// newFreeClientPool creates a new free client pool
+func newFreeClientPool(db ethdb.Database, connectedLimit, totalLimit int, clock mclock.Clock) *freeClientPool {
+ pool := &freeClientPool{
+ db: db,
+ clock: clock,
+ addressMap: make(map[string]*freeClientPoolEntry),
+ connPool: prque.New(poolSetIndex),
+ disconnPool: prque.New(poolSetIndex),
+ connectedLimit: connectedLimit,
+ totalLimit: totalLimit,
+ }
+ pool.loadFromDb()
+ return pool
+}
+
+func (f *freeClientPool) stop() {
+ f.lock.Lock()
+ f.closed = true
+ f.saveToDb()
+ f.lock.Unlock()
+}
+
+// connect should be called after a successful handshake. If the connection was
+// rejected, there is no need to call disconnect.
+//
+// Note: the disconnectFn callback should not block.
+func (f *freeClientPool) connect(address string, disconnectFn func()) bool {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ if f.closed {
+ return false
+ }
+ e := f.addressMap[address]
+ now := f.clock.Now()
+ var recentUsage int64
+ if e == nil {
+ e = &freeClientPoolEntry{address: address, index: -1}
+ f.addressMap[address] = e
+ } else {
+ if e.connected {
+ log.Debug("Client already connected", "address", address)
+ return false
+ }
+ recentUsage = int64(math.Exp(float64(e.logUsage-f.logOffset(now)) / fixedPointMultiplier))
+ }
+ e.linUsage = recentUsage - int64(now)
+ // check whether (linUsage+connectedBias) is smaller than the highest entry in the connected pool
+ if f.connPool.Size() == f.connectedLimit {
+ i := f.connPool.PopItem().(*freeClientPoolEntry)
+ if e.linUsage+int64(connectedBias)-i.linUsage < 0 {
+ // kick it out and accept the new client
+ f.connPool.Remove(i.index)
+ f.calcLogUsage(i, now)
+ i.connected = false
+ f.disconnPool.Push(i, -i.logUsage)
+ log.Debug("Client kicked out", "address", i.address)
+ i.disconnectFn()
+ } else {
+ // keep the old client and reject the new one
+ f.connPool.Push(i, i.linUsage)
+ log.Debug("Client rejected", "address", address)
+ return false
+ }
+ }
+ f.disconnPool.Remove(e.index)
+ e.connected = true
+ e.disconnectFn = disconnectFn
+ f.connPool.Push(e, e.linUsage)
+ if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit {
+ f.disconnPool.Pop()
+ }
+ log.Debug("Client accepted", "address", address)
+ return true
+}
+
+// disconnect should be called when a connection is terminated. If the disconnection
+// was initiated by the pool itself using disconnectFn then calling disconnect is
+// not necessary but permitted.
+func (f *freeClientPool) disconnect(address string) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ if f.closed {
+ return
+ }
+ e := f.addressMap[address]
+ now := f.clock.Now()
+ if !e.connected {
+ log.Debug("Client already disconnected", "address", address)
+ return
+ }
+
+ f.connPool.Remove(e.index)
+ f.calcLogUsage(e, now)
+ e.connected = false
+ f.disconnPool.Push(e, -e.logUsage)
+ log.Debug("Client disconnected", "address", address)
+}
+
+// logOffset calculates the time-dependent offset for the logarithmic
+// representation of recent usage
+func (f *freeClientPool) logOffset(now mclock.AbsTime) int64 {
+ // Note: fixedPointMultiplier acts as a multiplier here; the reason for dividing the divisor
+ // is to avoid int64 overflow. We assume that int64(recentUsageExpTC) >> fixedPointMultiplier.
+ logDecay := int64((time.Duration(now - f.startupTime)) / (recentUsageExpTC / fixedPointMultiplier))
+ return f.logOffsetAtStartup + logDecay
+}
+
+// calcLogUsage converts recent usage from linear to logarithmic representation
+// when disconnecting a peer or closing the client pool
+func (f *freeClientPool) calcLogUsage(e *freeClientPoolEntry, now mclock.AbsTime) {
+ dt := e.linUsage + int64(now)
+ if dt < 1 {
+ dt = 1
+ }
+ e.logUsage = int64(math.Log(float64(dt))*fixedPointMultiplier) + f.logOffset(now)
+}
+
+// freeClientPoolStorage is the RLP representation of the pool's database storage
+type freeClientPoolStorage struct {
+ LogOffset uint64
+ List []*freeClientPoolEntry
+}
+
+// loadFromDb restores pool status from the database storage
+// (automatically called at initialization)
+func (f *freeClientPool) loadFromDb() {
+ enc, err := f.db.Get([]byte("freeClientPool"))
+ if err != nil {
+ return
+ }
+ var storage freeClientPoolStorage
+ err = rlp.DecodeBytes(enc, &storage)
+ if err != nil {
+ log.Error("Failed to decode client list", "err", err)
+ return
+ }
+ f.logOffsetAtStartup = int64(storage.LogOffset)
+ f.startupTime = f.clock.Now()
+ for _, e := range storage.List {
+ log.Debug("Loaded free client record", "address", e.address, "logUsage", e.logUsage)
+ f.addressMap[e.address] = e
+ f.disconnPool.Push(e, -e.logUsage)
+ }
+}
+
+// saveToDb saves pool status to the database storage
+// (automatically called during shutdown)
+func (f *freeClientPool) saveToDb() {
+ now := f.clock.Now()
+ storage := freeClientPoolStorage{
+ LogOffset: uint64(f.logOffset(now)),
+ List: make([]*freeClientPoolEntry, len(f.addressMap)),
+ }
+ i := 0
+ for _, e := range f.addressMap {
+ if e.connected {
+ f.calcLogUsage(e, now)
+ }
+ storage.List[i] = e
+ i++
+ }
+ enc, err := rlp.EncodeToBytes(storage)
+ if err != nil {
+ log.Error("Failed to encode client list", "err", err)
+ } else {
+ f.db.Put([]byte("freeClientPool"), enc)
+ }
+}
+
+// freeClientPoolEntry represents a client address known by the pool.
+// When connected, recent usage is calculated as linUsage + int64(clock.Now())
+// When disconnected, it is calculated as exp(logUsage - logOffset) where logOffset
+// also grows linearly with time while the server is running.
+// Conversion between linear and logarithmic representation happens when connecting
+// or disconnecting the node.
+//
+// Note: linUsage and logUsage are values used with constantly growing offsets so
+// even though they are close to each other at any time they may wrap around int64
+// limits over time. Comparison should be performed accordingly.
+type freeClientPoolEntry struct {
+ address string
+ connected bool
+ disconnectFn func()
+ linUsage, logUsage int64
+ index int
+}
+
+func (e *freeClientPoolEntry) EncodeRLP(w io.Writer) error {
+ return rlp.Encode(w, []interface{}{e.address, uint64(e.logUsage)})
+}
+
+func (e *freeClientPoolEntry) DecodeRLP(s *rlp.Stream) error {
+ var entry struct {
+ Address string
+ LogUsage uint64
+ }
+ if err := s.Decode(&entry); err != nil {
+ return err
+ }
+ e.address = entry.Address
+ e.logUsage = int64(entry.LogUsage)
+ e.connected = false
+ e.index = -1
+ return nil
+}
+
+// poolSetIndex callback is used by both priority queues to set/update the index of
+// the element in the queue. Index is needed to remove elements other than the top one.
+func poolSetIndex(a interface{}, i int) {
+ a.(*freeClientPoolEntry).index = i
+}
diff --git a/les/freeclient_test.go b/les/freeclient_test.go
new file mode 100644
index 000000000..e95abc7aa
--- /dev/null
+++ b/les/freeclient_test.go
@@ -0,0 +1,139 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package light implements on-demand retrieval capable state and chain objects
+// for the Ethereum Light Client.
+package les
+
+import (
+ "fmt"
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/ethdb"
+)
+
+func TestFreeClientPoolL10C100(t *testing.T) {
+ testFreeClientPool(t, 10, 100)
+}
+
+func TestFreeClientPoolL40C200(t *testing.T) {
+ testFreeClientPool(t, 40, 200)
+}
+
+func TestFreeClientPoolL100C300(t *testing.T) {
+ testFreeClientPool(t, 100, 300)
+}
+
+const testFreeClientPoolTicks = 500000
+
+func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
+ var (
+ clock mclock.Simulated
+ db = ethdb.NewMemDatabase()
+ pool = newFreeClientPool(db, connLimit, 10000, &clock)
+ connected = make([]bool, clientCount)
+ connTicks = make([]int, clientCount)
+ disconnCh = make(chan int, clientCount)
+ )
+ peerId := func(i int) string {
+ return fmt.Sprintf("test peer #%d", i)
+ }
+ disconnFn := func(i int) func() {
+ return func() {
+ disconnCh <- i
+ }
+ }
+
+ // pool should accept new peers up to its connected limit
+ for i := 0; i < connLimit; i++ {
+ if pool.connect(peerId(i), disconnFn(i)) {
+ connected[i] = true
+ } else {
+ t.Fatalf("Test peer #%d rejected", i)
+ }
+ }
+ // since all accepted peers are new and should not be kicked out, the next one should be rejected
+ if pool.connect(peerId(connLimit), disconnFn(connLimit)) {
+ connected[connLimit] = true
+ t.Fatalf("Peer accepted over connected limit")
+ }
+
+ // randomly connect and disconnect peers, expect to have a similar total connection time at the end
+ for tickCounter := 0; tickCounter < testFreeClientPoolTicks; tickCounter++ {
+ clock.Run(1 * time.Second)
+
+ i := rand.Intn(clientCount)
+ if connected[i] {
+ pool.disconnect(peerId(i))
+ connected[i] = false
+ connTicks[i] += tickCounter
+ } else {
+ if pool.connect(peerId(i), disconnFn(i)) {
+ connected[i] = true
+ connTicks[i] -= tickCounter
+ }
+ }
+ pollDisconnects:
+ for {
+ select {
+ case i := <-disconnCh:
+ pool.disconnect(peerId(i))
+ if connected[i] {
+ connTicks[i] += tickCounter
+ connected[i] = false
+ }
+ default:
+ break pollDisconnects
+ }
+ }
+ }
+
+ expTicks := testFreeClientPoolTicks * connLimit / clientCount
+ expMin := expTicks - expTicks/10
+ expMax := expTicks + expTicks/10
+
+ // check if the total connected time of peers are all in the expected range
+ for i, c := range connected {
+ if c {
+ connTicks[i] += testFreeClientPoolTicks
+ }
+ if connTicks[i] < expMin || connTicks[i] > expMax {
+ t.Errorf("Total connected time of test node #%d (%d) outside expected range (%d to %d)", i, connTicks[i], expMin, expMax)
+ }
+ }
+
+ // a previously unknown peer should be accepted now
+ if !pool.connect("newPeer", func() {}) {
+ t.Fatalf("Previously unknown peer rejected")
+ }
+
+ // close and restart pool
+ pool.stop()
+ pool = newFreeClientPool(db, connLimit, 10000, &clock)
+
+ // try connecting all known peers (connLimit should be filled up)
+ for i := 0; i < clientCount; i++ {
+ pool.connect(peerId(i), func() {})
+ }
+ // expect pool to remember known nodes and kick out one of them to accept a new one
+ if !pool.connect("newPeer2", func() {}) {
+ t.Errorf("Previously unknown peer rejected after restarting pool")
+ }
+ pool.stop()
+}
diff --git a/les/handler.go b/les/handler.go
index 2fc4cde34..91a235bf0 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -28,6 +28,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
@@ -104,6 +105,7 @@ type ProtocolManager struct {
odr *LesOdr
server *LesServer
serverPool *serverPool
+ clientPool *freeClientPool
lesTopic discv5.Topic
reqDist *requestDistributor
retriever *retrieveManager
@@ -226,6 +228,7 @@ func (pm *ProtocolManager) Start(maxPeers int) {
if pm.lightSync {
go pm.syncer()
} else {
+ pm.clientPool = newFreeClientPool(pm.chainDb, maxPeers, 10000, mclock.System{})
go func() {
for range pm.newPeerCh {
}
@@ -243,6 +246,9 @@ func (pm *ProtocolManager) Stop() {
pm.noMorePeers <- struct{}{}
close(pm.quitSync) // quits syncer, fetcher
+ if pm.clientPool != nil {
+ pm.clientPool.stop()
+ }
// Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set.
@@ -264,7 +270,8 @@ func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgRea
// this function terminates, the peer is disconnected.
func (pm *ProtocolManager) handle(p *peer) error {
// Ignore maxPeers if this is a trusted peer
- if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
+ // In server mode we try to check into the client pool after handshake
+ if pm.lightSync && pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers
}
@@ -282,6 +289,19 @@ func (pm *ProtocolManager) handle(p *peer) error {
p.Log().Debug("Light Ethereum handshake failed", "err", err)
return err
}
+
+ if !pm.lightSync && !p.Peer.Info().Network.Trusted {
+ addr, ok := p.RemoteAddr().(*net.TCPAddr)
+ // test peer address is not a tcp address, don't use client pool if can not typecast
+ if ok {
+ id := addr.IP.String()
+ if !pm.clientPool.connect(id, func() { go pm.removePeer(p.id) }) {
+ return p2p.DiscTooManyPeers
+ }
+ defer pm.clientPool.disconnect(id)
+ }
+ }
+
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}