aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r--swarm/network/stream/stream.go36
1 files changed, 18 insertions, 18 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 6dcf31165..3b1b11d36 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -25,7 +25,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/log"
@@ -48,15 +48,15 @@ const (
// Registry registry for outgoing and incoming streamer constructors
type Registry struct {
+ addr enode.ID
api *API
- addr *network.BzzAddr
skipCheck bool
clientMu sync.RWMutex
serverMu sync.RWMutex
peersMu sync.RWMutex
serverFuncs map[string]func(*Peer, string, bool) (Server, error)
clientFuncs map[string]func(*Peer, string, bool) (Client, error)
- peers map[discover.NodeID]*Peer
+ peers map[enode.ID]*Peer
delivery *Delivery
intervalsStore state.Store
doRetrieve bool
@@ -73,7 +73,7 @@ type RegistryOptions struct {
}
// NewRegistry is Streamer constructor
-func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
+func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
if options == nil {
options = &RegistryOptions{}
}
@@ -81,11 +81,11 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore stora
options.SyncUpdateDelay = 15 * time.Second
}
streamer := &Registry{
- addr: addr,
+ addr: localID,
skipCheck: options.SkipCheck,
serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)),
clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)),
- peers: make(map[discover.NodeID]*Peer),
+ peers: make(map[enode.ID]*Peer),
delivery: delivery,
intervalsStore: intervalsStore,
doRetrieve: options.DoRetrieve,
@@ -223,7 +223,7 @@ func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Serv
return f, nil
}
-func (r *Registry) RequestSubscription(peerId discover.NodeID, s Stream, h *Range, prio uint8) error {
+func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error {
// check if the stream is registered
if _, err := r.GetServerFunc(s.Name); err != nil {
return err
@@ -251,7 +251,7 @@ func (r *Registry) RequestSubscription(peerId discover.NodeID, s Stream, h *Rang
}
// Subscribe initiates the streamer
-func (r *Registry) Subscribe(peerId discover.NodeID, s Stream, h *Range, priority uint8) error {
+func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error {
// check if the stream is registered
if _, err := r.GetClientFunc(s.Name); err != nil {
return err
@@ -291,7 +291,7 @@ func (r *Registry) Subscribe(peerId discover.NodeID, s Stream, h *Range, priorit
return peer.SendPriority(context.TODO(), msg, priority)
}
-func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error {
+func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
peer := r.getPeer(peerId)
if peer == nil {
return fmt.Errorf("peer not found %v", peerId)
@@ -310,7 +310,7 @@ func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error {
// Quit sends the QuitMsg to the peer to remove the
// stream peer client and terminate the streaming.
-func (r *Registry) Quit(peerId discover.NodeID, s Stream) error {
+func (r *Registry) Quit(peerId enode.ID, s Stream) error {
peer := r.getPeer(peerId)
if peer == nil {
log.Debug("stream quit: peer not found", "peer", peerId, "stream", s)
@@ -330,7 +330,7 @@ func (r *Registry) NodeInfo() interface{} {
return nil
}
-func (r *Registry) PeerInfo(id discover.NodeID) interface{} {
+func (r *Registry) PeerInfo(id enode.ID) interface{} {
return nil
}
@@ -338,7 +338,7 @@ func (r *Registry) Close() error {
return r.intervalsStore.Close()
}
-func (r *Registry) getPeer(peerId discover.NodeID) *Peer {
+func (r *Registry) getPeer(peerId enode.ID) *Peer {
r.peersMu.RLock()
defer r.peersMu.RUnlock()
@@ -393,7 +393,7 @@ func (r *Registry) updateSyncing() {
// map of all SYNC streams for all peers
// used at the and of the function to remove servers
// that are not needed anymore
- subs := make(map[discover.NodeID]map[Stream]struct{})
+ subs := make(map[enode.ID]map[Stream]struct{})
r.peersMu.RLock()
for id, peer := range r.peers {
peer.serverMu.RLock()
@@ -410,8 +410,8 @@ func (r *Registry) updateSyncing() {
r.peersMu.RUnlock()
// request subscriptions for all nodes and bins
- kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool {
- log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), p.ID(), bin))
+ kad.EachBin(r.addr[:], pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool {
+ log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr, p.ID(), bin))
// bin is always less then 256 and it is safe to convert it to type uint8
stream := NewStream("SYNC", FormatSyncBinKey(uint8(bin)), true)
@@ -449,7 +449,7 @@ func (r *Registry) updateSyncing() {
func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := protocols.NewPeer(p, rw, Spec)
- bp := network.NewBzzPeer(peer, r.addr)
+ bp := network.NewBzzPeer(peer)
np := network.NewPeer(bp, r.delivery.kad)
r.delivery.kad.On(np)
defer r.delivery.kad.Off(np)
@@ -727,10 +727,10 @@ func NewAPI(r *Registry) *API {
}
}
-func (api *API) SubscribeStream(peerId discover.NodeID, s Stream, history *Range, priority uint8) error {
+func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, priority uint8) error {
return api.streamer.Subscribe(peerId, s, history, priority)
}
-func (api *API) UnsubscribeStream(peerId discover.NodeID, s Stream) error {
+func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error {
return api.streamer.Unsubscribe(peerId, s)
}