aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/peer.go')
-rw-r--r--swarm/network/stream/peer.go328
1 files changed, 328 insertions, 0 deletions
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
new file mode 100644
index 000000000..29984a911
--- /dev/null
+++ b/swarm/network/stream/peer.go
@@ -0,0 +1,328 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package stream
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/p2p/protocols"
+ "github.com/ethereum/go-ethereum/swarm/log"
+ pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
+ "github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
+ "github.com/ethereum/go-ethereum/swarm/state"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+var sendTimeout = 30 * time.Second
+
+type notFoundError struct {
+ t string
+ s Stream
+}
+
+func newNotFoundError(t string, s Stream) *notFoundError {
+ return &notFoundError{t: t, s: s}
+}
+
+func (e *notFoundError) Error() string {
+ return fmt.Sprintf("%s not found for stream %q", e.t, e.s)
+}
+
+// Peer is the Peer extension for the streaming protocol
+type Peer struct {
+ *protocols.Peer
+ streamer *Registry
+ pq *pq.PriorityQueue
+ serverMu sync.RWMutex
+ clientMu sync.RWMutex // protects both clients and clientParams
+ servers map[Stream]*server
+ clients map[Stream]*client
+ // clientParams map keeps required client arguments
+ // that are set on Registry.Subscribe and used
+ // on creating a new client in offered hashes handler.
+ clientParams map[Stream]*clientParams
+ quit chan struct{}
+}
+
+// NewPeer is the constructor for Peer
+func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
+ p := &Peer{
+ Peer: peer,
+ pq: pq.New(int(PriorityQueue), PriorityQueueCap),
+ streamer: streamer,
+ servers: make(map[Stream]*server),
+ clients: make(map[Stream]*client),
+ clientParams: make(map[Stream]*clientParams),
+ quit: make(chan struct{}),
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ go p.pq.Run(ctx, func(i interface{}) { p.Send(i) })
+ go func() {
+ <-p.quit
+ cancel()
+ }()
+ return p
+}
+
+// Deliver sends a storeRequestMsg protocol message to the peer
+func (p *Peer) Deliver(chunk *storage.Chunk, priority uint8) error {
+ msg := &ChunkDeliveryMsg{
+ Addr: chunk.Addr,
+ SData: chunk.SData,
+ }
+ return p.SendPriority(msg, priority)
+}
+
+// SendPriority sends message to the peer using the outgoing priority queue
+func (p *Peer) SendPriority(msg interface{}, priority uint8) error {
+ defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
+ metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
+ ctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
+ defer cancel()
+ return p.pq.Push(ctx, msg, int(priority))
+}
+
+// SendOfferedHashes sends OfferedHashesMsg protocol msg
+func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
+ hashes, from, to, proof, err := s.SetNextBatch(f, t)
+ if err != nil {
+ return err
+ }
+ // true only when quiting
+ if len(hashes) == 0 {
+ return nil
+ }
+ if proof == nil {
+ proof = &HandoverProof{
+ Handover: &Handover{},
+ }
+ }
+ s.currentBatch = hashes
+ msg := &OfferedHashesMsg{
+ HandoverProof: proof,
+ Hashes: hashes,
+ From: from,
+ To: to,
+ Stream: s.stream,
+ }
+ log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
+ return p.SendPriority(msg, s.priority)
+}
+
+func (p *Peer) getServer(s Stream) (*server, error) {
+ p.serverMu.RLock()
+ defer p.serverMu.RUnlock()
+
+ server := p.servers[s]
+ if server == nil {
+ return nil, newNotFoundError("server", s)
+ }
+ return server, nil
+}
+
+func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) {
+ p.serverMu.Lock()
+ defer p.serverMu.Unlock()
+
+ if p.servers[s] != nil {
+ return nil, fmt.Errorf("server %s already registered", s)
+ }
+ os := &server{
+ Server: o,
+ stream: s,
+ priority: priority,
+ }
+ p.servers[s] = os
+ return os, nil
+}
+
+func (p *Peer) removeServer(s Stream) error {
+ p.serverMu.Lock()
+ defer p.serverMu.Unlock()
+
+ server, ok := p.servers[s]
+ if !ok {
+ return newNotFoundError("server", s)
+ }
+ server.Close()
+ delete(p.servers, s)
+ return nil
+}
+
+func (p *Peer) getClient(ctx context.Context, s Stream) (c *client, err error) {
+ var params *clientParams
+ func() {
+ p.clientMu.RLock()
+ defer p.clientMu.RUnlock()
+
+ c = p.clients[s]
+ if c != nil {
+ return
+ }
+ params = p.clientParams[s]
+ }()
+ if c != nil {
+ return c, nil
+ }
+
+ if params != nil {
+ //debug.PrintStack()
+ if err := params.waitClient(ctx); err != nil {
+ return nil, err
+ }
+ }
+
+ p.clientMu.RLock()
+ defer p.clientMu.RUnlock()
+
+ c = p.clients[s]
+ if c != nil {
+ return c, nil
+ }
+ return nil, newNotFoundError("client", s)
+}
+
+func (p *Peer) getOrSetClient(s Stream, from, to uint64) (c *client, created bool, err error) {
+ p.clientMu.Lock()
+ defer p.clientMu.Unlock()
+
+ c = p.clients[s]
+ if c != nil {
+ return c, false, nil
+ }
+
+ f, err := p.streamer.GetClientFunc(s.Name)
+ if err != nil {
+ return nil, false, err
+ }
+
+ is, err := f(p, s.Key, s.Live)
+ if err != nil {
+ return nil, false, err
+ }
+
+ cp, err := p.getClientParams(s)
+ if err != nil {
+ return nil, false, err
+ }
+ defer func() {
+ if err == nil {
+ if err := p.removeClientParams(s); err != nil {
+ log.Error("stream set client: remove client params", "stream", s, "peer", p, "err", err)
+ }
+ }
+ }()
+
+ intervalsKey := peerStreamIntervalsKey(p, s)
+ if s.Live {
+ // try to find previous history and live intervals and merge live into history
+ historyKey := peerStreamIntervalsKey(p, NewStream(s.Name, s.Key, false))
+ historyIntervals := &intervals.Intervals{}
+ err := p.streamer.intervalsStore.Get(historyKey, historyIntervals)
+ switch err {
+ case nil:
+ liveIntervals := &intervals.Intervals{}
+ err := p.streamer.intervalsStore.Get(intervalsKey, liveIntervals)
+ switch err {
+ case nil:
+ historyIntervals.Merge(liveIntervals)
+ if err := p.streamer.intervalsStore.Put(historyKey, historyIntervals); err != nil {
+ log.Error("stream set client: put history intervals", "stream", s, "peer", p, "err", err)
+ }
+ case state.ErrNotFound:
+ default:
+ log.Error("stream set client: get live intervals", "stream", s, "peer", p, "err", err)
+ }
+ case state.ErrNotFound:
+ default:
+ log.Error("stream set client: get history intervals", "stream", s, "peer", p, "err", err)
+ }
+ }
+
+ if err := p.streamer.intervalsStore.Put(intervalsKey, intervals.NewIntervals(from)); err != nil {
+ return nil, false, err
+ }
+
+ next := make(chan error, 1)
+ c = &client{
+ Client: is,
+ stream: s,
+ priority: cp.priority,
+ to: cp.to,
+ next: next,
+ quit: make(chan struct{}),
+ intervalsStore: p.streamer.intervalsStore,
+ intervalsKey: intervalsKey,
+ }
+ p.clients[s] = c
+ cp.clientCreated() // unblock all possible getClient calls that are waiting
+ next <- nil // this is to allow wantedKeysMsg before first batch arrives
+ return c, true, nil
+}
+
+func (p *Peer) removeClient(s Stream) error {
+ p.clientMu.Lock()
+ defer p.clientMu.Unlock()
+
+ client, ok := p.clients[s]
+ if !ok {
+ return newNotFoundError("client", s)
+ }
+ client.close()
+ return nil
+}
+
+func (p *Peer) setClientParams(s Stream, params *clientParams) error {
+ p.clientMu.Lock()
+ defer p.clientMu.Unlock()
+
+ if p.clients[s] != nil {
+ return fmt.Errorf("client %s already exists", s)
+ }
+ if p.clientParams[s] != nil {
+ return fmt.Errorf("client params %s already set", s)
+ }
+ p.clientParams[s] = params
+ return nil
+}
+
+func (p *Peer) getClientParams(s Stream) (*clientParams, error) {
+ params := p.clientParams[s]
+ if params == nil {
+ return nil, fmt.Errorf("client params '%v' not provided to peer %v", s, p.ID())
+ }
+ return params, nil
+}
+
+func (p *Peer) removeClientParams(s Stream) error {
+ _, ok := p.clientParams[s]
+ if !ok {
+ return newNotFoundError("client params", s)
+ }
+ delete(p.clientParams, s)
+ return nil
+}
+
+func (p *Peer) close() {
+ for _, s := range p.servers {
+ s.Close()
+ }
+}