aboutsummaryrefslogtreecommitdiffstats
path: root/swarm
diff options
context:
space:
mode:
authorholisticode <holistic.computing@gmail.com>2019-01-08 07:59:00 +0800
committerViktor TrĂ³n <viktor.tron@gmail.com>2019-01-08 07:59:00 +0800
commitae857e74bfda1f961dc5741441e5b36a2bb9aa93 (patch)
treee8d75eb51adccc215c63855b235e83ee45698e85 /swarm
parent56a3f6c03cc3c7ae38ab7354f8615c014bb2102a (diff)
downloadgo-tangerine-ae857e74bfda1f961dc5741441e5b36a2bb9aa93.tar
go-tangerine-ae857e74bfda1f961dc5741441e5b36a2bb9aa93.tar.gz
go-tangerine-ae857e74bfda1f961dc5741441e5b36a2bb9aa93.tar.bz2
go-tangerine-ae857e74bfda1f961dc5741441e5b36a2bb9aa93.tar.lz
go-tangerine-ae857e74bfda1f961dc5741441e5b36a2bb9aa93.tar.xz
go-tangerine-ae857e74bfda1f961dc5741441e5b36a2bb9aa93.tar.zst
go-tangerine-ae857e74bfda1f961dc5741441e5b36a2bb9aa93.zip
swarm, p2p/protocols: Stream accounting (#18337)
* swarm: completed 1st phase of swap accounting * swarm, p2p/protocols: added stream pricing * swarm/network/stream: gofmt simplify stream.go * swarm: fixed review comments * swarm: used snapshots for swap tests * swarm: custom retrieve for swap (less cascaded requests at any one time) * swarm: addressed PR comments * swarm: log output formatting * swarm: removed parallelism in swap tests * swarm: swap tests simplification * swarm: removed swap_test.go * swarm/network/stream: added prefix space for comments * swarm/network/stream: unit test for prices * swarm/network/stream: don't hardcode price * swarm/network/stream: fixed invalid price check
Diffstat (limited to 'swarm')
-rw-r--r--swarm/network/stream/stream.go110
-rw-r--r--swarm/network/stream/streamer_test.go31
2 files changed, 111 insertions, 30 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 090bef8d1..2e2c3c418 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -48,28 +48,28 @@ const (
HashSize = 32
)
-//Enumerate options for syncing and retrieval
+// Enumerate options for syncing and retrieval
type SyncingOption int
type RetrievalOption int
-//Syncing options
+// Syncing options
const (
- //Syncing disabled
+ // Syncing disabled
SyncingDisabled SyncingOption = iota
- //Register the client and the server but not subscribe
+ // Register the client and the server but not subscribe
SyncingRegisterOnly
- //Both client and server funcs are registered, subscribe sent automatically
+ // Both client and server funcs are registered, subscribe sent automatically
SyncingAutoSubscribe
)
const (
- //Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
+ // Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
RetrievalDisabled RetrievalOption = iota
- //Only the client side of the retrieve request is registered.
- //(light nodes do not serve retrieve requests)
- //once the client is registered, subscription to retrieve request stream is always sent
+ // Only the client side of the retrieve request is registered.
+ // (light nodes do not serve retrieve requests)
+ // once the client is registered, subscription to retrieve request stream is always sent
RetrievalClientOnly
- //Both client and server funcs are registered, subscribe sent automatically
+ // Both client and server funcs are registered, subscribe sent automatically
RetrievalEnabled
)
@@ -86,18 +86,18 @@ type Registry struct {
peers map[enode.ID]*Peer
delivery *Delivery
intervalsStore state.Store
- autoRetrieval bool //automatically subscribe to retrieve request stream
+ autoRetrieval bool // automatically subscribe to retrieve request stream
maxPeerServers int
- spec *protocols.Spec //this protocol's spec
- balance protocols.Balance //implements protocols.Balance, for accounting
- prices protocols.Prices //implements protocols.Prices, provides prices to accounting
+ balance protocols.Balance // implements protocols.Balance, for accounting
+ prices protocols.Prices // implements protocols.Prices, provides prices to accounting
+ spec *protocols.Spec // this protocol's spec
}
// RegistryOptions holds optional values for NewRegistry constructor.
type RegistryOptions struct {
SkipCheck bool
- Syncing SyncingOption //Defines syncing behavior
- Retrieval RetrievalOption //Defines retrieval behavior
+ Syncing SyncingOption // Defines syncing behavior
+ Retrieval RetrievalOption // Defines retrieval behavior
SyncUpdateDelay time.Duration
MaxPeerServers int // The limit of servers for each peer in registry
}
@@ -110,7 +110,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
if options.SyncUpdateDelay <= 0 {
options.SyncUpdateDelay = 15 * time.Second
}
- //check if retriaval has been disabled
+ // check if retrieval has been disabled
retrieval := options.Retrieval != RetrievalDisabled
streamer := &Registry{
@@ -130,7 +130,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
- //if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
+ // if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
if options.Retrieval == RetrievalEnabled {
streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) {
if !live {
@@ -140,20 +140,20 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
})
}
- //if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
+ // if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
if options.Retrieval != RetrievalDisabled {
streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
})
}
- //If syncing is not disabled, the syncing functions are registered (both client and server)
+ // If syncing is not disabled, the syncing functions are registered (both client and server)
if options.Syncing != SyncingDisabled {
RegisterSwarmSyncerServer(streamer, syncChunkStore)
RegisterSwarmSyncerClient(streamer, syncChunkStore)
}
- //if syncing is set to automatically subscribe to the syncing stream, start the subscription process
+ // if syncing is set to automatically subscribe to the syncing stream, start the subscription process
if options.Syncing == SyncingAutoSubscribe {
// latestIntC function ensures that
// - receiving from the in chan is not blocked by processing inside the for loop
@@ -235,13 +235,17 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
return streamer
}
-//we need to construct a spec instance per node instance
+// This is an accounted protocol, therefore we need to provide a pricing Hook to the spec
+// For simulations to be able to run multiple nodes and not override the hook's balance,
+// we need to construct a spec instance per node instance
func (r *Registry) setupSpec() {
- //first create the "bare" spec
+ // first create the "bare" spec
r.createSpec()
- //if balance is nil, this node has been started without swap support (swapEnabled flag is false)
+ // now create the pricing object
+ r.createPriceOracle()
+ // if balance is nil, this node has been started without swap support (swapEnabled flag is false)
if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() {
- //swap is enabled, so setup the hook
+ // swap is enabled, so setup the hook
r.spec.Hook = protocols.NewAccounting(r.balance, r.prices)
}
}
@@ -533,11 +537,11 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
return p.handleWantedHashesMsg(ctx, msg)
case *ChunkDeliveryMsgRetrieval:
- //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
+ // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
case *ChunkDeliveryMsgSyncing:
- //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
+ // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
case *RetrieveRequestMsg:
@@ -726,9 +730,9 @@ func (c *clientParams) clientCreated() {
close(c.clientCreatedC)
}
-//GetSpec returns the streamer spec to callers
-//This used to be a global variable but for simulations with
-//multiple nodes its fields (notably the Hook) would be overwritten
+// GetSpec returns the streamer spec to callers
+// This used to be a global variable but for simulations with
+// multiple nodes its fields (notably the Hook) would be overwritten
func (r *Registry) GetSpec() *protocols.Spec {
return r.spec
}
@@ -756,6 +760,52 @@ func (r *Registry) createSpec() {
r.spec = spec
}
+// An accountable message needs some meta information attached to it
+// in order to evaluate the correct price
+type StreamerPrices struct {
+ priceMatrix map[reflect.Type]*protocols.Price
+ registry *Registry
+}
+
+// Price implements the accounting interface and returns the price for a specific message
+func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price {
+ t := reflect.TypeOf(msg).Elem()
+ return sp.priceMatrix[t]
+}
+
+// Instead of hardcoding the price, get it
+// through a function - it could be quite complex in the future
+func (sp *StreamerPrices) getRetrieveRequestMsgPrice() uint64 {
+ return uint64(1)
+}
+
+// Instead of hardcoding the price, get it
+// through a function - it could be quite complex in the future
+func (sp *StreamerPrices) getChunkDeliveryMsgRetrievalPrice() uint64 {
+ return uint64(1)
+}
+
+// createPriceOracle sets up a matrix which can be queried to get
+// the price for a message via the Price method
+func (r *Registry) createPriceOracle() {
+ sp := &StreamerPrices{
+ registry: r,
+ }
+ sp.priceMatrix = map[reflect.Type]*protocols.Price{
+ reflect.TypeOf(ChunkDeliveryMsgRetrieval{}): {
+ Value: sp.getChunkDeliveryMsgRetrievalPrice(), // arbitrary price for now
+ PerByte: true,
+ Payer: protocols.Receiver,
+ },
+ reflect.TypeOf(RetrieveRequestMsg{}): {
+ Value: sp.getRetrieveRequestMsgPrice(), // arbitrary price for now
+ PerByte: false,
+ Payer: protocols.Sender,
+ },
+ }
+ r.prices = sp
+}
+
func (r *Registry) Protocols() []p2p.Protocol {
return []p2p.Protocol{
{
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 77fe55d34..e1b1c8286 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -921,3 +921,34 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
}
}
}
+
+//TestHasPriceImplementation is to check that the Registry has a
+//`Price` interface implementation
+func TestHasPriceImplementation(t *testing.T) {
+ _, r, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingDisabled,
+ })
+ defer teardown()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if r.prices == nil {
+ t.Fatal("No prices implementation available for the stream protocol")
+ }
+
+ pricesInstance, ok := r.prices.(*StreamerPrices)
+ if !ok {
+ t.Fatal("`Registry` does not have the expected Prices instance")
+ }
+ price := pricesInstance.Price(&ChunkDeliveryMsgRetrieval{})
+ if price == nil || price.Value == 0 || price.Value != pricesInstance.getChunkDeliveryMsgRetrievalPrice() {
+ t.Fatal("No prices set for chunk delivery msg")
+ }
+
+ price = pricesInstance.Price(&RetrieveRequestMsg{})
+ if price == nil || price.Value == 0 || price.Value != pricesInstance.getRetrieveRequestMsgPrice() {
+ t.Fatal("No prices set for chunk delivery msg")
+ }
+}