// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package stream
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/swarm/log"
pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
opentracing "github.com/opentracing/opentracing-go"
)
type notFoundError struct {
t string
s Stream
}
func newNotFoundError(t string, s Stream) *notFoundError {
return ¬FoundError{t: t, s: s}
}
func (e *notFoundError) Error() string {
return fmt.Sprintf("%s not found for stream %q", e.t, e.s)
}
// ErrMaxPeerServers will be returned if peer server limit is reached.
// It will be sent in the SubscribeErrorMsg.
var ErrMaxPeerServers = errors.New("max peer servers")
// Peer is the Peer extension for the streaming protocol
type Peer struct {
*protocols.Peer
streamer *Registry
pq *pq.PriorityQueue
serverMu sync.RWMutex
clientMu sync.RWMutex // protects both clients and clientParams
servers map[Stream]*server
clients map[Stream]*client
// clientParams map keeps required client arguments
// that are set on Registry.Subscribe and used
// on creating a new client in offered hashes handler.
clientParams map[Stream]*clientParams
quit chan struct{}
spans sync.Map
}
type WrappedPriorityMsg struct {
Context context.Context
Msg interface{}
}
// NewPeer is the constructor for Peer
func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
p := &Peer{
Peer: peer,
pq: pq.New(int(PriorityQueue), PriorityQueueCap),
streamer: streamer,
servers: make(map[Stream]*server),
clients: make(map[Stream]*client),
clientParams: make(map[Stream]*clientParams),
quit: make(chan struct{}),
spans: sync.Map{},
}
ctx, cancel := context.WithCancel(context.Background())
go p.pq.Run(ctx, func(i interface{}) {
wmsg := i.(WrappedPriorityMsg)
defer p.spans.Delete(wmsg.Context)
sp, ok := p.spans.Load(wmsg.Context)
if ok {
defer sp.(opentracing.Span).Finish()
}
err := p.Send(wmsg.Context, wmsg.Msg)
if err != nil {
log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
p.Drop(err)
}
})
// basic monitoring for pq contention
go func(pq *pq.PriorityQueue) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
var len_maxi int
var cap_maxi int
for k := range pq.Queues {
if len_maxi < len(pq.Queues[k]) {
len_maxi = len(pq.Queues[k])
}
if cap_maxi < cap(pq.Queues[k]) {
cap_maxi = cap(pq.Queues[k])
}
}
metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(len_maxi))
metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(cap_maxi))
case <-p.quit:
return
}
}
}(p.pq)
go func() {
<-p.quit
cancel()
}()
return p
}
// Deliver sends a storeRequestMsg protocol message to the peer
// Depending on the `syncing` parameter we send different message types
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
var msg interface{}
spanName := "send.chunk.delivery"
//we send different types of messages if delivery is for syncing or retrievals,
//even if handling and content of the message are the same,
//because swap accounting decides which messages need accounting based on the message type
if syncing {
msg = &ChunkDeliveryMsgSyncing{
Addr: chunk.Address(),
SData: chunk.Data(),
}
spanName += ".syncing"
} else {
msg = &ChunkDeliveryMsgRetrieval{
Addr: chunk.Address(),
SData: chunk.Data(),
}
spanName += ".retrieval"
}
return p.SendPriority(ctx, msg, priority, spanName)
}
// SendPriority sends message to the peer using the outgoing priority queue
func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8, traceId string) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
if traceId != "" {
var sp opentracing.Span
ctx, sp = spancontext.StartSpan(
ctx,
traceId,
)
p.spans.Store(ctx, sp)
}
wmsg := WrappedPriorityMsg{
Context: ctx,
Msg: msg,
}
err := p.pq.Push(wmsg, int(priority))
if err == pq.ErrContention {
log.Warn("dropping peer on priority queue contention", "peer", p.ID())
p.Drop(err)
}
return err
}
// SendOfferedHashes sends OfferedHashesMsg protocol msg
func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
var sp opentracing.Span
ctx, sp := spancontext.StartSpan(
context.TODO(),
"send.offered.hashes")
defer sp.Finish()
hashes, from, to, proof, err := s.setNextBatch(f, t)
if err != nil {
return err
}
// true only when quitting
if len(hashes) == 0 {
return nil
}
if proof == nil {
proof = &HandoverProof{
Handover: &Handover{},
}
}
s.currentBatch = hashes
msg := &OfferedHashesMsg{
HandoverProof: proof,
Hashes: hashes,
From: from,
To: to,
Stream: s.stream,
}
log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
return p.SendPriority(ctx, msg, s.priority, "send.offered.hashes")
}
func (p *Peer) getServer(s Stream) (*server, error) {
p.serverMu.RLock()
defer p.serverMu.RUnlock()
server := p.servers[s]
if server == nil {
return nil, newNotFoundError("server", s)
}
return server, nil
}
func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) {
p.serverMu.Lock()
defer p.serverMu.Unlock()
if p.servers[s] != nil {
return nil, fmt.Errorf("server %s already registered", s)
}
if p.streamer.maxPeerServers > 0 && len(p.servers) >= p.streamer.maxPeerServers {
return nil, ErrMaxPeerServers
}
sessionIndex, err := o.SessionIndex()
if err != nil {
return nil, err
}
os := &server{
Server: o,
stream: s,
priority: priority,
sessionIndex: sessionIndex,
}
p.servers[s] = os
return os, nil
}
func (p *Peer) removeServer(s Stream) error {
p.serverMu.Lock()
defer p.serverMu.Unlock()
server, ok := p.servers[s]
if !ok {
return newNotFoundError("server", s)
}
server.Close()
delete(p.servers, s)
return nil
}
func (p *Peer) getClient(ctx context.Context, s Stream) (c *client, err error) {
var params *clientParams
func() {
p.clientMu.RLock()
defer p.clientMu.RUnlock()
c = p.clients[s]
if c != nil {
return
}
params = p.clientParams[s]
}()
if c != nil {
return c, nil
}
if params != nil {
//debug.PrintStack()
if err := params.waitClient(ctx); err != nil {
return nil, err
}
}
p.clientMu.RLock()
defer p.clientMu.RUnlock()
c = p.clients[s]
if c != nil {
return c, nil
}
return nil, newNotFoundError("client", s)
}
func (p *Peer) getOrSetClient(s Stream, from, to uint64) (c *client, created bool, err error) {
p.clientMu.Lock()
defer p.clientMu.Unlock()
c = p.clients[s]
if c != nil {
return c, false, nil
}
f, err := p.streamer.GetClientFunc(s.Name)
if err != nil {
return nil, false, err
}
is, err := f(p, s.Key, s.Live)
if err != nil {
return nil, false, err
}
cp, err := p.getClientParams(s)
if err != nil {
return nil, false, err
}
defer func() {
if err == nil {
if err := p.removeClientParams(s); err != nil {
log.Error("stream set client: remove client params", "stream", s, "peer", p, "err", err)
}
}
}()
intervalsKey := peerStreamIntervalsKey(p, s)
if s.Live {
// try to find previous history and live intervals and merge live into history
historyKey := peerStreamIntervalsKey(p, NewStream(s.Name, s.Key, false))
historyIntervals := &intervals.Intervals{}
err := p.streamer.intervalsStore.Get(historyKey, historyIntervals)
switch err {
case nil:
liveIntervals := &intervals.Intervals{}
err := p.streamer.intervalsStore.Get(intervalsKey, liveIntervals)
switch err {
case nil:
historyIntervals.Merge(liveIntervals)
if err := p.streamer.intervalsStore.Put(historyKey, historyIntervals); err != nil {
log.Error("stream set client: put history intervals", "stream", s, "peer", p, "err", err)
}
case state.ErrNotFound:
default:
log.Error("stream set client: get live intervals", "stream", s, "peer", p, "err", err)
}
case state.ErrNotFound:
default:
log.Error("stream set client: get history intervals", "stream", s, "peer", p, "err", err)
}
}
if err := p.streamer.intervalsStore.Put(intervalsKey, intervals.NewIntervals(from)); err != nil {
return nil, false, err
}
next := make(chan error, 1)
c = &client{
Client: is,
stream: s,
priority: cp.priority,
to: cp.to,
next: next,
quit: make(chan struct{}),
intervalsStore: p.streamer.intervalsStore,
intervalsKey: intervalsKey,
}
p.clients[s] = c
cp.clientCreated() // unblock all possible getClient calls that are waiting
next <- nil // this is to allow wantedKeysMsg before first batch arrives
return c, true, nil
}
func (p *Peer) removeClient(s Stream) error {
p.clientMu.Lock()
defer p.clientMu.Unlock()
client, ok := p.clients[s]
if !ok {
return newNotFoundError("client", s)
}
client.close()
delete(p.clients, s)
return nil
}
func (p *Peer) setClientParams(s Stream, params *clientParams) error {
p.clientMu.Lock()
defer p.clientMu.Unlock()
if p.clients[s] != nil {
return fmt.Errorf("client %s already exists", s)
}
if p.clientParams[s] != nil {
return fmt.Errorf("client params %s already set", s)
}
p.clientParams[s] = params
return nil
}
func (p *Peer) getClientParams(s Stream) (*clientParams, error) {
params := p.clientParams[s]
if params == nil {
return nil, fmt.Errorf("client params '%v' not provided to peer %v", s, p.ID())
}
return params, nil
}
func (p *Peer) removeClientParams(s Stream) error {
_, ok := p.clientParams[s]
if !ok {
return newNotFoundError("client params", s)
}
delete(p.clientParams, s)
return nil
}
func (p *Peer) close() {
for _, s := range p.servers {
s.Close()
}
}