aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r--swarm/network/stream/stream.go46
1 files changed, 16 insertions, 30 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index cd0580a0c..1f1f34b7b 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -32,10 +32,8 @@ import (
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/pot"
- "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
- opentracing "github.com/opentracing/opentracing-go"
)
const (
@@ -43,8 +41,8 @@ const (
Mid
High
Top
- PriorityQueue // number of queues
- PriorityQueueCap = 32 // queue capacity
+ PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top
+ PriorityQueueCap = 128 // queue capacity
HashSize = 32
)
@@ -73,7 +71,7 @@ type RegistryOptions struct {
}
// NewRegistry is Streamer constructor
-func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry {
+func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
if options == nil {
options = &RegistryOptions{}
}
@@ -93,13 +91,13 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, _ bool) (Server, error) {
- return NewSwarmChunkServer(delivery.db), nil
+ return NewSwarmChunkServer(delivery.chunkStore), nil
})
streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, delivery.db, false, NewStream(swarmChunkServerStreamName, t, live))
+ return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
})
- RegisterSwarmSyncerServer(streamer, db)
- RegisterSwarmSyncerClient(streamer, db)
+ RegisterSwarmSyncerServer(streamer, syncChunkStore)
+ RegisterSwarmSyncerClient(streamer, syncChunkStore)
if options.DoSync {
// latestIntC function ensures that
@@ -130,7 +128,7 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i
// wait for kademlia table to be healthy
time.Sleep(options.SyncUpdateDelay)
- kad := streamer.delivery.overlay.(*network.Kademlia)
+ kad := streamer.delivery.kad
depthC := latestIntC(kad.NeighbourhoodDepthC())
addressBookSizeC := latestIntC(kad.AddrCountC())
@@ -325,16 +323,6 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error {
return peer.Send(context.TODO(), msg)
}
-func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error {
- var sp opentracing.Span
- ctx, sp = spancontext.StartSpan(
- ctx,
- "registry.retrieve")
- defer sp.Finish()
-
- return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck)
-}
-
func (r *Registry) NodeInfo() interface{} {
return nil
}
@@ -398,9 +386,7 @@ func (r *Registry) Run(p *network.BzzPeer) error {
// and they are no longer required after iteration, request to Quit
// them will be send to appropriate peers.
func (r *Registry) updateSyncing() {
- // if overlay in not Kademlia, panic
- kad := r.delivery.overlay.(*network.Kademlia)
-
+ kad := r.delivery.kad
// map of all SYNC streams for all peers
// used at the and of the function to remove servers
// that are not needed anymore
@@ -421,8 +407,7 @@ func (r *Registry) updateSyncing() {
r.peersMu.RUnlock()
// request subscriptions for all nodes and bins
- kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(conn network.OverlayConn, bin int) bool {
- p := conn.(network.Peer)
+ kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool {
log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), p.ID(), bin))
// bin is always less then 256 and it is safe to convert it to type uint8
@@ -461,10 +446,11 @@ func (r *Registry) updateSyncing() {
func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := protocols.NewPeer(p, rw, Spec)
- bzzPeer := network.NewBzzTestPeer(peer, r.addr)
- r.delivery.overlay.On(bzzPeer)
- defer r.delivery.overlay.Off(bzzPeer)
- return r.Run(bzzPeer)
+ bp := network.NewBzzPeer(peer, r.addr)
+ np := network.NewPeer(bp, r.delivery.kad)
+ r.delivery.kad.On(np)
+ defer r.delivery.kad.Off(np)
+ return r.Run(bp)
}
// HandleMsg is the message handler that delegates incoming messages
@@ -559,7 +545,7 @@ func (c client) NextInterval() (start, end uint64, err error) {
// Client interface for incoming peer Streamer
type Client interface {
- NeedData(context.Context, []byte) func()
+ NeedData(context.Context, []byte) func(context.Context) error
BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
Close()
}