diff options
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r-- | swarm/network/stream/stream.go | 46 |
1 files changed, 16 insertions, 30 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index cd0580a0c..1f1f34b7b 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -32,10 +32,8 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/pot" - "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" - opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -43,8 +41,8 @@ const ( Mid High Top - PriorityQueue // number of queues - PriorityQueueCap = 32 // queue capacity + PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top + PriorityQueueCap = 128 // queue capacity HashSize = 32 ) @@ -73,7 +71,7 @@ type RegistryOptions struct { } // NewRegistry is Streamer constructor -func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry { +func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry { if options == nil { options = &RegistryOptions{} } @@ -93,13 +91,13 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, _ bool) (Server, error) { - return NewSwarmChunkServer(delivery.db), nil + return NewSwarmChunkServer(delivery.chunkStore), nil }) streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, delivery.db, false, NewStream(swarmChunkServerStreamName, t, live)) + return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) }) - RegisterSwarmSyncerServer(streamer, db) - RegisterSwarmSyncerClient(streamer, db) + RegisterSwarmSyncerServer(streamer, syncChunkStore) + RegisterSwarmSyncerClient(streamer, syncChunkStore) if options.DoSync { // latestIntC function ensures that @@ -130,7 +128,7 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i // wait for kademlia table to be healthy time.Sleep(options.SyncUpdateDelay) - kad := streamer.delivery.overlay.(*network.Kademlia) + kad := streamer.delivery.kad depthC := latestIntC(kad.NeighbourhoodDepthC()) addressBookSizeC := latestIntC(kad.AddrCountC()) @@ -325,16 +323,6 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error { return peer.Send(context.TODO(), msg) } -func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error { - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "registry.retrieve") - defer sp.Finish() - - return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck) -} - func (r *Registry) NodeInfo() interface{} { return nil } @@ -398,9 +386,7 @@ func (r *Registry) Run(p *network.BzzPeer) error { // and they are no longer required after iteration, request to Quit // them will be send to appropriate peers. func (r *Registry) updateSyncing() { - // if overlay in not Kademlia, panic - kad := r.delivery.overlay.(*network.Kademlia) - + kad := r.delivery.kad // map of all SYNC streams for all peers // used at the and of the function to remove servers // that are not needed anymore @@ -421,8 +407,7 @@ func (r *Registry) updateSyncing() { r.peersMu.RUnlock() // request subscriptions for all nodes and bins - kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(conn network.OverlayConn, bin int) bool { - p := conn.(network.Peer) + kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool { log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), p.ID(), bin)) // bin is always less then 256 and it is safe to convert it to type uint8 @@ -461,10 +446,11 @@ func (r *Registry) updateSyncing() { func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := protocols.NewPeer(p, rw, Spec) - bzzPeer := network.NewBzzTestPeer(peer, r.addr) - r.delivery.overlay.On(bzzPeer) - defer r.delivery.overlay.Off(bzzPeer) - return r.Run(bzzPeer) + bp := network.NewBzzPeer(peer, r.addr) + np := network.NewPeer(bp, r.delivery.kad) + r.delivery.kad.On(np) + defer r.delivery.kad.Off(np) + return r.Run(bp) } // HandleMsg is the message handler that delegates incoming messages @@ -559,7 +545,7 @@ func (c client) NextInterval() (start, end uint64, err error) { // Client interface for incoming peer Streamer type Client interface { - NeedData(context.Context, []byte) func() + NeedData(context.Context, []byte) func(context.Context) error BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) Close() } |