aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r--swarm/network/stream/stream.go76
1 files changed, 51 insertions, 25 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 695ff0c50..32e107823 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math"
+ "reflect"
"sync"
"time"
@@ -87,6 +88,9 @@ type Registry struct {
intervalsStore state.Store
autoRetrieval bool //automatically subscribe to retrieve request stream
maxPeerServers int
+ spec *protocols.Spec //this protocol's spec
+ balance protocols.Balance //implements protocols.Balance, for accounting
+ prices protocols.Prices //implements protocols.Prices, provides prices to accounting
}
// RegistryOptions holds optional values for NewRegistry constructor.
@@ -99,7 +103,7 @@ type RegistryOptions struct {
}
// NewRegistry is Streamer constructor
-func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
+func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry {
if options == nil {
options = &RegistryOptions{}
}
@@ -119,7 +123,10 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
intervalsStore: intervalsStore,
autoRetrieval: retrieval,
maxPeerServers: options.MaxPeerServers,
+ balance: balance,
}
+ streamer.setupSpec()
+
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
@@ -228,6 +235,17 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
return streamer
}
+//we need to construct a spec instance per node instance
+func (r *Registry) setupSpec() {
+ //first create the "bare" spec
+ r.createSpec()
+ //if balance is nil, this node has been started without swap support (swapEnabled flag is false)
+ if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() {
+ //swap is enabled, so setup the hook
+ r.spec.Hook = protocols.NewAccounting(r.balance, r.prices)
+ }
+}
+
// RegisterClient registers an incoming streamer constructor
func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error)) {
r.clientMu.Lock()
@@ -492,7 +510,7 @@ func (r *Registry) updateSyncing() {
}
func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
- peer := protocols.NewPeer(p, rw, Spec)
+ peer := protocols.NewPeer(p, rw, r.spec)
bp := network.NewBzzPeer(peer)
np := network.NewPeer(bp, r.delivery.kad)
r.delivery.kad.On(np)
@@ -716,35 +734,43 @@ func (c *clientParams) clientCreated() {
close(c.clientCreatedC)
}
-// Spec is the spec of the streamer protocol
-var Spec = &protocols.Spec{
- Name: "stream",
- Version: 8,
- MaxMsgSize: 10 * 1024 * 1024,
- Messages: []interface{}{
- UnsubscribeMsg{},
- OfferedHashesMsg{},
- WantedHashesMsg{},
- TakeoverProofMsg{},
- SubscribeMsg{},
- RetrieveRequestMsg{},
- ChunkDeliveryMsgRetrieval{},
- SubscribeErrorMsg{},
- RequestSubscriptionMsg{},
- QuitMsg{},
- ChunkDeliveryMsgSyncing{},
- },
+//GetSpec returns the streamer spec to callers
+//This used to be a global variable but for simulations with
+//multiple nodes its fields (notably the Hook) would be overwritten
+func (r *Registry) GetSpec() *protocols.Spec {
+ return r.spec
+}
+
+func (r *Registry) createSpec() {
+ // Spec is the spec of the streamer protocol
+ var spec = &protocols.Spec{
+ Name: "stream",
+ Version: 8,
+ MaxMsgSize: 10 * 1024 * 1024,
+ Messages: []interface{}{
+ UnsubscribeMsg{},
+ OfferedHashesMsg{},
+ WantedHashesMsg{},
+ TakeoverProofMsg{},
+ SubscribeMsg{},
+ RetrieveRequestMsg{},
+ ChunkDeliveryMsgRetrieval{},
+ SubscribeErrorMsg{},
+ RequestSubscriptionMsg{},
+ QuitMsg{},
+ ChunkDeliveryMsgSyncing{},
+ },
+ }
+ r.spec = spec
}
func (r *Registry) Protocols() []p2p.Protocol {
return []p2p.Protocol{
{
- Name: Spec.Name,
- Version: Spec.Version,
- Length: Spec.Length(),
+ Name: r.spec.Name,
+ Version: r.spec.Version,
+ Length: r.spec.Length(),
Run: r.runProtocol,
- // NodeInfo: ,
- // PeerInfo: ,
},
}
}