aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r--swarm/network/stream/stream.go36
1 files changed, 18 insertions, 18 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 319fc62c9..ea7cce8cb 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -25,7 +25,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/log"
@@ -48,15 +48,15 @@ const (
// Registry registry for outgoing and incoming streamer constructors
type Registry struct {
+ addr enode.ID
api *API
- addr *network.BzzAddr
skipCheck bool
clientMu sync.RWMutex
serverMu sync.RWMutex
peersMu sync.RWMutex
serverFuncs map[string]func(*Peer, string, bool) (Server, error)
clientFuncs map[string]func(*Peer, string, bool) (Client, error)
- peers map[discover.NodeID]*Peer
+ peers map[enode.ID]*Peer
delivery *Delivery
intervalsStore state.Store
doRetrieve bool
@@ -71,7 +71,7 @@ type RegistryOptions struct {
}
// NewRegistry is Streamer constructor
-func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
+func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
if options == nil {
options = &RegistryOptions{}
}
@@ -79,11 +79,11 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore stora
options.SyncUpdateDelay = 15 * time.Second
}
streamer := &Registry{
- addr: addr,
+ addr: localID,
skipCheck: options.SkipCheck,
serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)),
clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)),
- peers: make(map[discover.NodeID]*Peer),
+ peers: make(map[enode.ID]*Peer),
delivery: delivery,
intervalsStore: intervalsStore,
doRetrieve: options.DoRetrieve,
@@ -220,7 +220,7 @@ func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Serv
return f, nil
}
-func (r *Registry) RequestSubscription(peerId discover.NodeID, s Stream, h *Range, prio uint8) error {
+func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error {
// check if the stream is registered
if _, err := r.GetServerFunc(s.Name); err != nil {
return err
@@ -248,7 +248,7 @@ func (r *Registry) RequestSubscription(peerId discover.NodeID, s Stream, h *Rang
}
// Subscribe initiates the streamer
-func (r *Registry) Subscribe(peerId discover.NodeID, s Stream, h *Range, priority uint8) error {
+func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error {
// check if the stream is registered
if _, err := r.GetClientFunc(s.Name); err != nil {
return err
@@ -288,7 +288,7 @@ func (r *Registry) Subscribe(peerId discover.NodeID, s Stream, h *Range, priorit
return peer.SendPriority(context.TODO(), msg, priority)
}
-func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error {
+func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
peer := r.getPeer(peerId)
if peer == nil {
return fmt.Errorf("peer not found %v", peerId)
@@ -307,7 +307,7 @@ func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error {
// Quit sends the QuitMsg to the peer to remove the
// stream peer client and terminate the streaming.
-func (r *Registry) Quit(peerId discover.NodeID, s Stream) error {
+func (r *Registry) Quit(peerId enode.ID, s Stream) error {
peer := r.getPeer(peerId)
if peer == nil {
log.Debug("stream quit: peer not found", "peer", peerId, "stream", s)
@@ -327,7 +327,7 @@ func (r *Registry) NodeInfo() interface{} {
return nil
}
-func (r *Registry) PeerInfo(id discover.NodeID) interface{} {
+func (r *Registry) PeerInfo(id enode.ID) interface{} {
return nil
}
@@ -335,7 +335,7 @@ func (r *Registry) Close() error {
return r.intervalsStore.Close()
}
-func (r *Registry) getPeer(peerId discover.NodeID) *Peer {
+func (r *Registry) getPeer(peerId enode.ID) *Peer {
r.peersMu.RLock()
defer r.peersMu.RUnlock()
@@ -390,7 +390,7 @@ func (r *Registry) updateSyncing() {
// map of all SYNC streams for all peers
// used at the and of the function to remove servers
// that are not needed anymore
- subs := make(map[discover.NodeID]map[Stream]struct{})
+ subs := make(map[enode.ID]map[Stream]struct{})
r.peersMu.RLock()
for id, peer := range r.peers {
peer.serverMu.RLock()
@@ -407,8 +407,8 @@ func (r *Registry) updateSyncing() {
r.peersMu.RUnlock()
// request subscriptions for all nodes and bins
- kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool {
- log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), p.ID(), bin))
+ kad.EachBin(r.addr[:], pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool {
+ log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr, p.ID(), bin))
// bin is always less then 256 and it is safe to convert it to type uint8
stream := NewStream("SYNC", FormatSyncBinKey(uint8(bin)), true)
@@ -446,7 +446,7 @@ func (r *Registry) updateSyncing() {
func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := protocols.NewPeer(p, rw, Spec)
- bp := network.NewBzzPeer(peer, r.addr)
+ bp := network.NewBzzPeer(peer)
np := network.NewPeer(bp, r.delivery.kad)
r.delivery.kad.On(np)
defer r.delivery.kad.Off(np)
@@ -724,10 +724,10 @@ func NewAPI(r *Registry) *API {
}
}
-func (api *API) SubscribeStream(peerId discover.NodeID, s Stream, history *Range, priority uint8) error {
+func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, priority uint8) error {
return api.streamer.Subscribe(peerId, s, history, priority)
}
-func (api *API) UnsubscribeStream(peerId discover.NodeID, s Stream) error {
+func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error {
return api.streamer.Unsubscribe(peerId, s)
}