diff options
Diffstat (limited to 'swarm/network/stream/peer.go')
-rw-r--r-- | swarm/network/stream/peer.go | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go new file mode 100644 index 000000000..29984a911 --- /dev/null +++ b/swarm/network/stream/peer.go @@ -0,0 +1,328 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package stream + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/p2p/protocols" + "github.com/ethereum/go-ethereum/swarm/log" + pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue" + "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" + "github.com/ethereum/go-ethereum/swarm/state" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +var sendTimeout = 30 * time.Second + +type notFoundError struct { + t string + s Stream +} + +func newNotFoundError(t string, s Stream) *notFoundError { + return ¬FoundError{t: t, s: s} +} + +func (e *notFoundError) Error() string { + return fmt.Sprintf("%s not found for stream %q", e.t, e.s) +} + +// Peer is the Peer extension for the streaming protocol +type Peer struct { + *protocols.Peer + streamer *Registry + pq *pq.PriorityQueue + serverMu sync.RWMutex + clientMu sync.RWMutex // protects both clients and clientParams + servers map[Stream]*server + clients map[Stream]*client + // clientParams map keeps required client arguments + // that are set on Registry.Subscribe and used + // on creating a new client in offered hashes handler. + clientParams map[Stream]*clientParams + quit chan struct{} +} + +// NewPeer is the constructor for Peer +func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { + p := &Peer{ + Peer: peer, + pq: pq.New(int(PriorityQueue), PriorityQueueCap), + streamer: streamer, + servers: make(map[Stream]*server), + clients: make(map[Stream]*client), + clientParams: make(map[Stream]*clientParams), + quit: make(chan struct{}), + } + ctx, cancel := context.WithCancel(context.Background()) + go p.pq.Run(ctx, func(i interface{}) { p.Send(i) }) + go func() { + <-p.quit + cancel() + }() + return p +} + +// Deliver sends a storeRequestMsg protocol message to the peer +func (p *Peer) Deliver(chunk *storage.Chunk, priority uint8) error { + msg := &ChunkDeliveryMsg{ + Addr: chunk.Addr, + SData: chunk.SData, + } + return p.SendPriority(msg, priority) +} + +// SendPriority sends message to the peer using the outgoing priority queue +func (p *Peer) SendPriority(msg interface{}, priority uint8) error { + defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) + metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) + ctx, cancel := context.WithTimeout(context.Background(), sendTimeout) + defer cancel() + return p.pq.Push(ctx, msg, int(priority)) +} + +// SendOfferedHashes sends OfferedHashesMsg protocol msg +func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { + hashes, from, to, proof, err := s.SetNextBatch(f, t) + if err != nil { + return err + } + // true only when quiting + if len(hashes) == 0 { + return nil + } + if proof == nil { + proof = &HandoverProof{ + Handover: &Handover{}, + } + } + s.currentBatch = hashes + msg := &OfferedHashesMsg{ + HandoverProof: proof, + Hashes: hashes, + From: from, + To: to, + Stream: s.stream, + } + log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) + return p.SendPriority(msg, s.priority) +} + +func (p *Peer) getServer(s Stream) (*server, error) { + p.serverMu.RLock() + defer p.serverMu.RUnlock() + + server := p.servers[s] + if server == nil { + return nil, newNotFoundError("server", s) + } + return server, nil +} + +func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) { + p.serverMu.Lock() + defer p.serverMu.Unlock() + + if p.servers[s] != nil { + return nil, fmt.Errorf("server %s already registered", s) + } + os := &server{ + Server: o, + stream: s, + priority: priority, + } + p.servers[s] = os + return os, nil +} + +func (p *Peer) removeServer(s Stream) error { + p.serverMu.Lock() + defer p.serverMu.Unlock() + + server, ok := p.servers[s] + if !ok { + return newNotFoundError("server", s) + } + server.Close() + delete(p.servers, s) + return nil +} + +func (p *Peer) getClient(ctx context.Context, s Stream) (c *client, err error) { + var params *clientParams + func() { + p.clientMu.RLock() + defer p.clientMu.RUnlock() + + c = p.clients[s] + if c != nil { + return + } + params = p.clientParams[s] + }() + if c != nil { + return c, nil + } + + if params != nil { + //debug.PrintStack() + if err := params.waitClient(ctx); err != nil { + return nil, err + } + } + + p.clientMu.RLock() + defer p.clientMu.RUnlock() + + c = p.clients[s] + if c != nil { + return c, nil + } + return nil, newNotFoundError("client", s) +} + +func (p *Peer) getOrSetClient(s Stream, from, to uint64) (c *client, created bool, err error) { + p.clientMu.Lock() + defer p.clientMu.Unlock() + + c = p.clients[s] + if c != nil { + return c, false, nil + } + + f, err := p.streamer.GetClientFunc(s.Name) + if err != nil { + return nil, false, err + } + + is, err := f(p, s.Key, s.Live) + if err != nil { + return nil, false, err + } + + cp, err := p.getClientParams(s) + if err != nil { + return nil, false, err + } + defer func() { + if err == nil { + if err := p.removeClientParams(s); err != nil { + log.Error("stream set client: remove client params", "stream", s, "peer", p, "err", err) + } + } + }() + + intervalsKey := peerStreamIntervalsKey(p, s) + if s.Live { + // try to find previous history and live intervals and merge live into history + historyKey := peerStreamIntervalsKey(p, NewStream(s.Name, s.Key, false)) + historyIntervals := &intervals.Intervals{} + err := p.streamer.intervalsStore.Get(historyKey, historyIntervals) + switch err { + case nil: + liveIntervals := &intervals.Intervals{} + err := p.streamer.intervalsStore.Get(intervalsKey, liveIntervals) + switch err { + case nil: + historyIntervals.Merge(liveIntervals) + if err := p.streamer.intervalsStore.Put(historyKey, historyIntervals); err != nil { + log.Error("stream set client: put history intervals", "stream", s, "peer", p, "err", err) + } + case state.ErrNotFound: + default: + log.Error("stream set client: get live intervals", "stream", s, "peer", p, "err", err) + } + case state.ErrNotFound: + default: + log.Error("stream set client: get history intervals", "stream", s, "peer", p, "err", err) + } + } + + if err := p.streamer.intervalsStore.Put(intervalsKey, intervals.NewIntervals(from)); err != nil { + return nil, false, err + } + + next := make(chan error, 1) + c = &client{ + Client: is, + stream: s, + priority: cp.priority, + to: cp.to, + next: next, + quit: make(chan struct{}), + intervalsStore: p.streamer.intervalsStore, + intervalsKey: intervalsKey, + } + p.clients[s] = c + cp.clientCreated() // unblock all possible getClient calls that are waiting + next <- nil // this is to allow wantedKeysMsg before first batch arrives + return c, true, nil +} + +func (p *Peer) removeClient(s Stream) error { + p.clientMu.Lock() + defer p.clientMu.Unlock() + + client, ok := p.clients[s] + if !ok { + return newNotFoundError("client", s) + } + client.close() + return nil +} + +func (p *Peer) setClientParams(s Stream, params *clientParams) error { + p.clientMu.Lock() + defer p.clientMu.Unlock() + + if p.clients[s] != nil { + return fmt.Errorf("client %s already exists", s) + } + if p.clientParams[s] != nil { + return fmt.Errorf("client params %s already set", s) + } + p.clientParams[s] = params + return nil +} + +func (p *Peer) getClientParams(s Stream) (*clientParams, error) { + params := p.clientParams[s] + if params == nil { + return nil, fmt.Errorf("client params '%v' not provided to peer %v", s, p.ID()) + } + return params, nil +} + +func (p *Peer) removeClientParams(s Stream) error { + _, ok := p.clientParams[s] + if !ok { + return newNotFoundError("client params", s) + } + delete(p.clientParams, s) + return nil +} + +func (p *Peer) close() { + for _, s := range p.servers { + s.Close() + } +} |