From 4976fcc91a43b5c7047c51a03985887b694f0fbb Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Thu, 24 Jan 2019 12:02:18 +0100 Subject: swarm: bootnode-mode, new bootnodes and no p2p package discovery (#18498) (cherry picked from commit bbd120354a8d226b446591eeda9f9462cb9b690a) --- swarm/swarm.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) (limited to 'swarm/swarm.go') diff --git a/swarm/swarm.go b/swarm/swarm.go index db52675fd..d17d81320 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -84,12 +84,11 @@ type Swarm struct { tracerClose io.Closer } -// creates a new swarm service instance +// NewSwarm creates a new swarm service instance // implements node.Service // If mockStore is not nil, it will be used as the storage for chunk data. // MockStore should be used only for testing. func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err error) { - if bytes.Equal(common.FromHex(config.PublicKey), storage.ZeroAddr) { return nil, fmt.Errorf("empty public key") } @@ -116,10 +115,11 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e config.HiveParams.Discovery = true bzzconfig := &network.BzzConfig{ - NetworkID: config.NetworkID, - OverlayAddr: common.FromHex(config.BzzKey), - HiveParams: config.HiveParams, - LightNode: config.LightNodeEnabled, + NetworkID: config.NetworkID, + OverlayAddr: common.FromHex(config.BzzKey), + HiveParams: config.HiveParams, + LightNode: config.LightNodeEnabled, + BootnodeMode: config.BootnodeMode, } self.stateStore, err = state.NewDBStore(filepath.Join(config.Path, "state-store.db")) @@ -455,12 +455,16 @@ func (self *Swarm) Stop() error { return err } -// implements the node.Service interface -func (self *Swarm) Protocols() (protos []p2p.Protocol) { - protos = append(protos, self.bzz.Protocols()...) +// Protocols implements the node.Service interface +func (s *Swarm) Protocols() (protos []p2p.Protocol) { + if s.config.BootnodeMode { + protos = append(protos, s.bzz.Protocols()...) + } else { + protos = append(protos, s.bzz.Protocols()...) - if self.ps != nil { - protos = append(protos, self.ps.Protocols()...) + if s.ps != nil { + protos = append(protos, s.ps.Protocols()...) + } } return } -- cgit v1.2.3 From b774d0a507017b2ba7a0d02f6aa23a0a83d1a768 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jano=C5=A1=20Gulja=C5=A1?= Date: Thu, 24 Jan 2019 12:02:47 +0100 Subject: swarm: fix a data race on startTime (#18511) (cherry picked from commit fa34429a2695f57bc0a96cd78f25e86700d8ee44) --- swarm/swarm.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'swarm/swarm.go') diff --git a/swarm/swarm.go b/swarm/swarm.go index d17d81320..9ece8bb66 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -56,7 +56,6 @@ import ( ) var ( - startTime time.Time updateGaugesPeriod = 5 * time.Second startCounter = metrics.NewRegisteredCounter("stack,start", nil) stopCounter = metrics.NewRegisteredCounter("stack,stop", nil) @@ -80,6 +79,7 @@ type Swarm struct { swap *swap.Swap stateStore *state.DBStore accountingMetrics *protocols.AccountingMetrics + startTime time.Time tracerClose io.Closer } @@ -344,7 +344,7 @@ Start is called when the stack is started */ // implements the node.Service interface func (self *Swarm) Start(srv *p2p.Server) error { - startTime = time.Now() + self.startTime = time.Now() self.tracerClose = tracing.Closer @@ -414,7 +414,7 @@ func (self *Swarm) periodicallyUpdateGauges() { } func (self *Swarm) updateGauges() { - uptimeGauge.Update(time.Since(startTime).Nanoseconds()) + uptimeGauge.Update(time.Since(self.startTime).Nanoseconds()) requestsCacheGauge.Update(int64(self.netStore.RequestsCacheLen())) } -- cgit v1.2.3 From d1ace4f344616fb6fa8643872c1f9cac89f8549e Mon Sep 17 00:00:00 2001 From: holisticode Date: Thu, 7 Feb 2019 09:49:19 -0500 Subject: swarm: Debug API and HasChunks() API endpoint (#18980) (cherry picked from commit 41597c2856d6ac7328baca1340c3e36ab0edd382) --- swarm/swarm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'swarm/swarm.go') diff --git a/swarm/swarm.go b/swarm/swarm.go index 9ece8bb66..cba4e73ef 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -485,7 +485,7 @@ func (self *Swarm) APIs() []rpc.API { { Namespace: "bzz", Version: "3.0", - Service: api.NewControl(self.api, self.bzz.Hive), + Service: api.NewInspector(self.api, self.bzz.Hive, self.netStore), Public: false, }, { -- cgit v1.2.3 From a0127019c3d516e8d8cf83839583bcf71af763e0 Mon Sep 17 00:00:00 2001 From: Elad Date: Wed, 13 Feb 2019 14:15:03 +0700 Subject: swarm: fix uptime gauge update goroutine leak by introducing cleanup functions (#19040) (cherry picked from commit d596bea2d501d20b92e0fd4baa8bba682157dfa7) --- swarm/swarm.go | 53 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 21 deletions(-) (limited to 'swarm/swarm.go') diff --git a/swarm/swarm.go b/swarm/swarm.go index cba4e73ef..cb914d39a 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -79,7 +79,7 @@ type Swarm struct { swap *swap.Swap stateStore *state.DBStore accountingMetrics *protocols.AccountingMetrics - startTime time.Time + cleanupFuncs []func() error tracerClose io.Closer } @@ -106,9 +106,10 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e } self = &Swarm{ - config: config, - backend: backend, - privateKey: config.ShiftPrivateKey(), + config: config, + backend: backend, + privateKey: config.ShiftPrivateKey(), + cleanupFuncs: []func() error{}, } log.Debug("Setting up Swarm service components") @@ -344,7 +345,7 @@ Start is called when the stack is started */ // implements the node.Service interface func (self *Swarm) Start(srv *p2p.Server) error { - self.startTime = time.Now() + startTime := time.Now() self.tracerClose = tracing.Closer @@ -396,26 +397,28 @@ func (self *Swarm) Start(srv *p2p.Server) error { }() } - self.periodicallyUpdateGauges() + doneC := make(chan struct{}) - startCounter.Inc(1) - self.streamer.Start(srv) - return nil -} + self.cleanupFuncs = append(self.cleanupFuncs, func() error { + close(doneC) + return nil + }) -func (self *Swarm) periodicallyUpdateGauges() { - ticker := time.NewTicker(updateGaugesPeriod) - - go func() { - for range ticker.C { - self.updateGauges() + go func(time.Time) { + for { + select { + case <-time.After(updateGaugesPeriod): + uptimeGauge.Update(time.Since(startTime).Nanoseconds()) + requestsCacheGauge.Update(int64(self.netStore.RequestsCacheLen())) + case <-doneC: + return + } } - }() -} + }(startTime) -func (self *Swarm) updateGauges() { - uptimeGauge.Update(time.Since(self.startTime).Nanoseconds()) - requestsCacheGauge.Update(int64(self.netStore.RequestsCacheLen())) + startCounter.Inc(1) + self.streamer.Start(srv) + return nil } // implements the node.Service interface @@ -452,6 +455,14 @@ func (self *Swarm) Stop() error { if self.stateStore != nil { self.stateStore.Close() } + + for _, cleanF := range self.cleanupFuncs { + err = cleanF() + if err != nil { + log.Error("encountered an error while running cleanup function", "err", err) + break + } + } return err } -- cgit v1.2.3 From fd34bf594c7aac73530a449130387a0797fd1977 Mon Sep 17 00:00:00 2001 From: Kiel barry Date: Mon, 30 Apr 2018 16:05:24 -0700 Subject: contracts/*: golint updates for this or self warning (cherry picked from commit 53b823afc8c24337290ba2e7889c2dde496e9272) --- swarm/swarm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'swarm/swarm.go') diff --git a/swarm/swarm.go b/swarm/swarm.go index cb914d39a..5b0e5f177 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -502,7 +502,7 @@ func (self *Swarm) APIs() []rpc.API { { Namespace: "chequebook", Version: chequebook.Version, - Service: chequebook.NewApi(self.config.Swap.Chequebook), + Service: chequebook.NewAPI(self.config.Swap.Chequebook), Public: false, }, { -- cgit v1.2.3 From 7ae2a7bd84e7ee738874916dd07c9f2020c4fabb Mon Sep 17 00:00:00 2001 From: lash Date: Mon, 18 Feb 2019 16:44:50 +0100 Subject: swarm: Reinstate Pss Protocol add call through swarm service (#19117) * swarm: Reinstate Pss Protocol add call through swarm service * swarm: Even less self (cherry picked from commit d88c6ce6b058ccd04b03d079d486b1d55fe5ef61) --- swarm/swarm.go | 107 ++++++++++++++++++++++++++++++--------------------------- 1 file changed, 56 insertions(+), 51 deletions(-) (limited to 'swarm/swarm.go') diff --git a/swarm/swarm.go b/swarm/swarm.go index 5b0e5f177..3ab98b3ab 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -344,51 +344,51 @@ Start is called when the stack is started * TODO: start subservices like sword, swear, swarmdns */ // implements the node.Service interface -func (self *Swarm) Start(srv *p2p.Server) error { +func (s *Swarm) Start(srv *p2p.Server) error { startTime := time.Now() - self.tracerClose = tracing.Closer + s.tracerClose = tracing.Closer // update uaddr to correct enode - newaddr := self.bzz.UpdateLocalAddr([]byte(srv.Self().String())) + newaddr := s.bzz.UpdateLocalAddr([]byte(srv.Self().String())) log.Info("Updated bzz local addr", "oaddr", fmt.Sprintf("%x", newaddr.OAddr), "uaddr", fmt.Sprintf("%s", newaddr.UAddr)) // set chequebook //TODO: Currently if swap is enabled and no chequebook (or inexistent) contract is provided, the node would crash. //Once we integrate back the contracts, this check MUST be revisited - if self.config.SwapEnabled && self.config.SwapAPI != "" { + if s.config.SwapEnabled && s.config.SwapAPI != "" { ctx := context.Background() // The initial setup has no deadline. - err := self.SetChequebook(ctx) + err := s.SetChequebook(ctx) if err != nil { return fmt.Errorf("Unable to set chequebook for SWAP: %v", err) } - log.Debug(fmt.Sprintf("-> cheque book for SWAP: %v", self.config.Swap.Chequebook())) + log.Debug(fmt.Sprintf("-> cheque book for SWAP: %v", s.config.Swap.Chequebook())) } else { log.Debug(fmt.Sprintf("SWAP disabled: no cheque book set")) } log.Info("Starting bzz service") - err := self.bzz.Start(srv) + err := s.bzz.Start(srv) if err != nil { log.Error("bzz failed", "err", err) return err } - log.Info("Swarm network started", "bzzaddr", fmt.Sprintf("%x", self.bzz.Hive.BaseAddr())) + log.Info("Swarm network started", "bzzaddr", fmt.Sprintf("%x", s.bzz.Hive.BaseAddr())) - if self.ps != nil { - self.ps.Start(srv) + if s.ps != nil { + s.ps.Start(srv) } // start swarm http proxy server - if self.config.Port != "" { - addr := net.JoinHostPort(self.config.ListenAddr, self.config.Port) - server := httpapi.NewServer(self.api, self.config.Cors) + if s.config.Port != "" { + addr := net.JoinHostPort(s.config.ListenAddr, s.config.Port) + server := httpapi.NewServer(s.api, s.config.Cors) - if self.config.Cors != "" { - log.Debug("Swarm HTTP proxy CORS headers", "allowedOrigins", self.config.Cors) + if s.config.Cors != "" { + log.Debug("Swarm HTTP proxy CORS headers", "allowedOrigins", s.config.Cors) } - log.Debug("Starting Swarm HTTP proxy", "port", self.config.Port) + log.Debug("Starting Swarm HTTP proxy", "port", s.config.Port) go func() { err := server.ListenAndServe(addr) if err != nil { @@ -399,7 +399,7 @@ func (self *Swarm) Start(srv *p2p.Server) error { doneC := make(chan struct{}) - self.cleanupFuncs = append(self.cleanupFuncs, func() error { + s.cleanupFuncs = append(s.cleanupFuncs, func() error { close(doneC) return nil }) @@ -409,7 +409,7 @@ func (self *Swarm) Start(srv *p2p.Server) error { select { case <-time.After(updateGaugesPeriod): uptimeGauge.Update(time.Since(startTime).Nanoseconds()) - requestsCacheGauge.Update(int64(self.netStore.RequestsCacheLen())) + requestsCacheGauge.Update(int64(s.netStore.RequestsCacheLen())) case <-doneC: return } @@ -417,46 +417,46 @@ func (self *Swarm) Start(srv *p2p.Server) error { }(startTime) startCounter.Inc(1) - self.streamer.Start(srv) + s.streamer.Start(srv) return nil } // implements the node.Service interface // stops all component services. -func (self *Swarm) Stop() error { - if self.tracerClose != nil { - err := self.tracerClose.Close() +func (s *Swarm) Stop() error { + if s.tracerClose != nil { + err := s.tracerClose.Close() if err != nil { return err } } - if self.ps != nil { - self.ps.Stop() + if s.ps != nil { + s.ps.Stop() } - if ch := self.config.Swap.Chequebook(); ch != nil { + if ch := s.config.Swap.Chequebook(); ch != nil { ch.Stop() ch.Save() } - if self.swap != nil { - self.swap.Close() + if s.swap != nil { + s.swap.Close() } - if self.accountingMetrics != nil { - self.accountingMetrics.Close() + if s.accountingMetrics != nil { + s.accountingMetrics.Close() } - if self.netStore != nil { - self.netStore.Close() + if s.netStore != nil { + s.netStore.Close() } - self.sfs.Stop() + s.sfs.Stop() stopCounter.Inc(1) - self.streamer.Stop() + s.streamer.Stop() - err := self.bzz.Stop() - if self.stateStore != nil { - self.stateStore.Close() + err := s.bzz.Stop() + if s.stateStore != nil { + s.stateStore.Close() } - for _, cleanF := range self.cleanupFuncs { + for _, cleanF := range s.cleanupFuncs { err = cleanF() if err != nil { log.Error("encountered an error while running cleanup function", "err", err) @@ -482,68 +482,73 @@ func (s *Swarm) Protocols() (protos []p2p.Protocol) { // implements node.Service // APIs returns the RPC API descriptors the Swarm implementation offers -func (self *Swarm) APIs() []rpc.API { +func (s *Swarm) APIs() []rpc.API { apis := []rpc.API{ // public APIs { Namespace: "bzz", Version: "3.0", - Service: &Info{self.config, chequebook.ContractParams}, + Service: &Info{s.config, chequebook.ContractParams}, Public: true, }, // admin APIs { Namespace: "bzz", Version: "3.0", - Service: api.NewInspector(self.api, self.bzz.Hive, self.netStore), + Service: api.NewInspector(s.api, s.bzz.Hive, s.netStore), Public: false, }, { Namespace: "chequebook", Version: chequebook.Version, - Service: chequebook.NewAPI(self.config.Swap.Chequebook), + Service: chequebook.NewAPI(s.config.Swap.Chequebook), Public: false, }, { Namespace: "swarmfs", Version: fuse.Swarmfs_Version, - Service: self.sfs, + Service: s.sfs, Public: false, }, { Namespace: "accounting", Version: protocols.AccountingVersion, - Service: protocols.NewAccountingApi(self.accountingMetrics), + Service: protocols.NewAccountingApi(s.accountingMetrics), Public: false, }, } - apis = append(apis, self.bzz.APIs()...) + apis = append(apis, s.bzz.APIs()...) - if self.ps != nil { - apis = append(apis, self.ps.APIs()...) + if s.ps != nil { + apis = append(apis, s.ps.APIs()...) } return apis } // SetChequebook ensures that the local checquebook is set up on chain. -func (self *Swarm) SetChequebook(ctx context.Context) error { - err := self.config.Swap.SetChequebook(ctx, self.backend, self.config.Path) +func (s *Swarm) SetChequebook(ctx context.Context) error { + err := s.config.Swap.SetChequebook(ctx, s.backend, s.config.Path) if err != nil { return err } - log.Info(fmt.Sprintf("new chequebook set (%v): saving config file, resetting all connections in the hive", self.config.Swap.Contract.Hex())) + log.Info(fmt.Sprintf("new chequebook set (%v): saving config file, resetting all connections in the hive", s.config.Swap.Contract.Hex())) return nil } +// RegisterPssProtocol adds a devp2p protocol to the swarm node's Pss instance +func (s *Swarm) RegisterPssProtocol(topic *pss.Topic, spec *protocols.Spec, targetprotocol *p2p.Protocol, options *pss.ProtocolParams) (*pss.Protocol, error) { + return pss.RegisterProtocol(s.ps, topic, spec, targetprotocol, options) +} + // serialisable info about swarm type Info struct { *api.Config *chequebook.Params } -func (self *Info) Info() *Info { - return self +func (s *Info) Info() *Info { + return s } -- cgit v1.2.3