// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

// Package les implements the Light Ethereum Subprotocol.
package les

import (
	"io"
	"math"
	"sync"
	"time"

	"github.com/ethereum/go-ethereum/common/mclock"
	"github.com/ethereum/go-ethereum/common/prque"
	"github.com/ethereum/go-ethereum/ethdb"
	"github.com/ethereum/go-ethereum/log"
	"github.com/ethereum/go-ethereum/rlp"
)

// freeClientPool implements a client database that limits the connection time
// of each client and manages accepting/rejecting incoming connections and even
// kicking out some connected clients. The pool calculates recent usage time
// for each known client (a value that increases linearly when the client is
// connected and decreases exponentially when not connected). Clients with lower
// recent usage are preferred, unknown nodes have the highest priority. Already
// connected nodes receive a small bias in their favor in order to avoid accepting
// and instantly kicking out clients.
//
// Note: the pool can use any string for client identification. Using signature
// keys for that purpose would not make sense when being known has a negative
// value for the client. Currently the LES protocol manager uses IP addresses
// (without port address) to identify clients.
type freeClientPool struct {
	db     ethdb.Database
	lock   sync.Mutex
	clock  mclock.Clock
	closed bool

	connectedLimit, totalLimit int

	addressMap            map[string]*freeClientPoolEntry
	connPool, disconnPool *prque.Prque
	startupTime           mclock.AbsTime
	logOffsetAtStartup    int64
}

const (
	recentUsageExpTC     = time.Hour   // time constant of the exponential weighting window for "recent" server usage
	fixedPointMultiplier = 0x1000000   // constant to convert logarithms to fixed point format
	connectedBias        = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
)

// newFreeClientPool creates a new free client pool
func newFreeClientPool(db ethdb.Database, connectedLimit, totalLimit int, clock mclock.Clock) *freeClientPool {
	pool := &freeClientPool{
		db:             db,
		clock:          clock,
		addressMap:     make(map[string]*freeClientPoolEntry),
		connPool:       prque.New(poolSetIndex),
		disconnPool:    prque.New(poolSetIndex),
		connectedLimit: connectedLimit,
		totalLimit:     totalLimit,
	}
	pool.loadFromDb()
	return pool
}

func (f *freeClientPool) stop() {
	f.lock.Lock()
	f.closed = true
	f.saveToDb()
	f.lock.Unlock()
}

// connect should be called after a successful handshake. If the connection was
// rejected, there is no need to call disconnect.
//
// Note: the disconnectFn callback should not block.
func (f *freeClientPool) connect(address string, disconnectFn func()) bool {
	f.lock.Lock()
	defer f.lock.Unlock()

	if f.closed {
		return false
	}
	e := f.addressMap[address]
	now := f.clock.Now()
	var recentUsage int64
	if e == nil {
		e = &freeClientPoolEntry{address: address, index: -1}
		f.addressMap[address] = e
	} else {
		if e.connected {
			log.Debug("Client already connected", "address", address)
			return false
		}
		recentUsage = int64(math.Exp(float64(e.logUsage-f.logOffset(now)) / fixedPointMultiplier))
	}
	e.linUsage = recentUsage - int64(now)
	// check whether (linUsage+connectedBias) is smaller than the highest entry in the connected pool
	if f.connPool.Size() == f.connectedLimit {
		i := f.connPool.PopItem().(*freeClientPoolEntry)
		if e.linUsage+int64(connectedBias)-i.linUsage < 0 {
			// kick it out and accept the new client
			f.connPool.Remove(i.index)
			f.calcLogUsage(i, now)
			i.connected = false
			f.disconnPool.Push(i, -i.logUsage)
			log.Debug("Client kicked out", "address", i.address)
			i.disconnectFn()
		} else {
			// keep the old client and reject the new one
			f.connPool.Push(i, i.linUsage)
			log.Debug("Client rejected", "address", address)
			return false
		}
	}
	f.disconnPool.Remove(e.index)
	e.connected = true
	e.disconnectFn = disconnectFn
	f.connPool.Push(e, e.linUsage)
	if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit {
		f.disconnPool.Pop()
	}
	log.Debug("Client accepted", "address", address)
	return true
}

// disconnect should be called when a connection is terminated. If the disconnection
// was initiated by the pool itself using disconnectFn then calling disconnect is
// not necessary but permitted.
func (f *freeClientPool) disconnect(address string) {
	f.lock.Lock()
	defer f.lock.Unlock()

	if f.closed {
		return
	}
	e := f.addressMap[address]
	now := f.clock.Now()
	if !e.connected {
		log.Debug("Client already disconnected", "address", address)
		return
	}

	f.connPool.Remove(e.index)
	f.calcLogUsage(e, now)
	e.connected = false
	f.disconnPool.Push(e, -e.logUsage)
	log.Debug("Client disconnected", "address", address)
}

// logOffset calculates the time-dependent offset for the logarithmic
// representation of recent usage
func (f *freeClientPool) logOffset(now mclock.AbsTime) int64 {
	// Note: fixedPointMultiplier acts as a multiplier here; the reason for dividing the divisor
	// is to avoid int64 overflow. We assume that int64(recentUsageExpTC) >> fixedPointMultiplier.
	logDecay := int64((time.Duration(now - f.startupTime)) / (recentUsageExpTC / fixedPointMultiplier))
	return f.logOffsetAtStartup + logDecay
}

// calcLogUsage converts recent usage from linear to logarithmic representation
// when disconnecting a peer or closing the client pool
func (f *freeClientPool) calcLogUsage(e *freeClientPoolEntry, now mclock.AbsTime) {
	dt := e.linUsage + int64(now)
	if dt < 1 {
		dt = 1
	}
	e.logUsage = int64(math.Log(float64(dt))*fixedPointMultiplier) + f.logOffset(now)
}

// freeClientPoolStorage is the RLP representation of the pool's database storage
type freeClientPoolStorage struct {
	LogOffset uint64
	List      []*freeClientPoolEntry
}

// loadFromDb restores pool status from the database storage
// (automatically called at initialization)
func (f *freeClientPool) loadFromDb() {
	enc, err := f.db.Get([]byte("freeClientPool"))
	if err != nil {
		return
	}
	var storage freeClientPoolStorage
	err = rlp.DecodeBytes(enc, &storage)
	if err != nil {
		log.Error("Failed to decode client list", "err", err)
		return
	}
	f.logOffsetAtStartup = int64(storage.LogOffset)
	f.startupTime = f.clock.Now()
	for _, e := range storage.List {
		log.Debug("Loaded free client record", "address", e.address, "logUsage", e.logUsage)
		f.addressMap[e.address] = e
		f.disconnPool.Push(e, -e.logUsage)
	}
}

// saveToDb saves pool status to the database storage
// (automatically called during shutdown)
func (f *freeClientPool) saveToDb() {
	now := f.clock.Now()
	storage := freeClientPoolStorage{
		LogOffset: uint64(f.logOffset(now)),
		List:      make([]*freeClientPoolEntry, len(f.addressMap)),
	}
	i := 0
	for _, e := range f.addressMap {
		if e.connected {
			f.calcLogUsage(e, now)
		}
		storage.List[i] = e
		i++
	}
	enc, err := rlp.EncodeToBytes(storage)
	if err != nil {
		log.Error("Failed to encode client list", "err", err)
	} else {
		f.db.Put([]byte("freeClientPool"), enc)
	}
}

// freeClientPoolEntry represents a client address known by the pool.
// When connected, recent usage is calculated as linUsage + int64(clock.Now())
// When disconnected, it is calculated as exp(logUsage - logOffset) where logOffset
// also grows linearly with time while the server is running.
// Conversion between linear and logarithmic representation happens when connecting
// or disconnecting the node.
//
// Note: linUsage and logUsage are values used with constantly growing offsets so
// even though they are close to each other at any time they may wrap around int64
// limits over time. Comparison should be performed accordingly.
type freeClientPoolEntry struct {
	address            string
	connected          bool
	disconnectFn       func()
	linUsage, logUsage int64
	index              int
}

func (e *freeClientPoolEntry) EncodeRLP(w io.Writer) error {
	return rlp.Encode(w, []interface{}{e.address, uint64(e.logUsage)})
}

func (e *freeClientPoolEntry) DecodeRLP(s *rlp.Stream) error {
	var entry struct {
		Address  string
		LogUsage uint64
	}
	if err := s.Decode(&entry); err != nil {
		return err
	}
	e.address = entry.Address
	e.logUsage = int64(entry.LogUsage)
	e.connected = false
	e.index = -1
	return nil
}

// poolSetIndex callback is used by both priority queues to set/update the index of
// the element in the queue. Index is needed to remove elements other than the top one.
func poolSetIndex(a interface{}, i int) {
	a.(*freeClientPoolEntry).index = i
}