aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/swarm.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/swarm.go')
-rw-r--r--swarm/swarm.go178
1 files changed, 99 insertions, 79 deletions
diff --git a/swarm/swarm.go b/swarm/swarm.go
index db52675fd..3ab98b3ab 100644
--- a/swarm/swarm.go
+++ b/swarm/swarm.go
@@ -56,7 +56,6 @@ import (
)
var (
- startTime time.Time
updateGaugesPeriod = 5 * time.Second
startCounter = metrics.NewRegisteredCounter("stack,start", nil)
stopCounter = metrics.NewRegisteredCounter("stack,stop", nil)
@@ -80,16 +79,16 @@ type Swarm struct {
swap *swap.Swap
stateStore *state.DBStore
accountingMetrics *protocols.AccountingMetrics
+ cleanupFuncs []func() error
tracerClose io.Closer
}
-// creates a new swarm service instance
+// NewSwarm creates a new swarm service instance
// implements node.Service
// If mockStore is not nil, it will be used as the storage for chunk data.
// MockStore should be used only for testing.
func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err error) {
-
if bytes.Equal(common.FromHex(config.PublicKey), storage.ZeroAddr) {
return nil, fmt.Errorf("empty public key")
}
@@ -107,19 +106,21 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
}
self = &Swarm{
- config: config,
- backend: backend,
- privateKey: config.ShiftPrivateKey(),
+ config: config,
+ backend: backend,
+ privateKey: config.ShiftPrivateKey(),
+ cleanupFuncs: []func() error{},
}
log.Debug("Setting up Swarm service components")
config.HiveParams.Discovery = true
bzzconfig := &network.BzzConfig{
- NetworkID: config.NetworkID,
- OverlayAddr: common.FromHex(config.BzzKey),
- HiveParams: config.HiveParams,
- LightNode: config.LightNodeEnabled,
+ NetworkID: config.NetworkID,
+ OverlayAddr: common.FromHex(config.BzzKey),
+ HiveParams: config.HiveParams,
+ LightNode: config.LightNodeEnabled,
+ BootnodeMode: config.BootnodeMode,
}
self.stateStore, err = state.NewDBStore(filepath.Join(config.Path, "state-store.db"))
@@ -343,51 +344,51 @@ Start is called when the stack is started
* TODO: start subservices like sword, swear, swarmdns
*/
// implements the node.Service interface
-func (self *Swarm) Start(srv *p2p.Server) error {
- startTime = time.Now()
+func (s *Swarm) Start(srv *p2p.Server) error {
+ startTime := time.Now()
- self.tracerClose = tracing.Closer
+ s.tracerClose = tracing.Closer
// update uaddr to correct enode
- newaddr := self.bzz.UpdateLocalAddr([]byte(srv.Self().String()))
+ newaddr := s.bzz.UpdateLocalAddr([]byte(srv.Self().String()))
log.Info("Updated bzz local addr", "oaddr", fmt.Sprintf("%x", newaddr.OAddr), "uaddr", fmt.Sprintf("%s", newaddr.UAddr))
// set chequebook
//TODO: Currently if swap is enabled and no chequebook (or inexistent) contract is provided, the node would crash.
//Once we integrate back the contracts, this check MUST be revisited
- if self.config.SwapEnabled && self.config.SwapAPI != "" {
+ if s.config.SwapEnabled && s.config.SwapAPI != "" {
ctx := context.Background() // The initial setup has no deadline.
- err := self.SetChequebook(ctx)
+ err := s.SetChequebook(ctx)
if err != nil {
return fmt.Errorf("Unable to set chequebook for SWAP: %v", err)
}
- log.Debug(fmt.Sprintf("-> cheque book for SWAP: %v", self.config.Swap.Chequebook()))
+ log.Debug(fmt.Sprintf("-> cheque book for SWAP: %v", s.config.Swap.Chequebook()))
} else {
log.Debug(fmt.Sprintf("SWAP disabled: no cheque book set"))
}
log.Info("Starting bzz service")
- err := self.bzz.Start(srv)
+ err := s.bzz.Start(srv)
if err != nil {
log.Error("bzz failed", "err", err)
return err
}
- log.Info("Swarm network started", "bzzaddr", fmt.Sprintf("%x", self.bzz.Hive.BaseAddr()))
+ log.Info("Swarm network started", "bzzaddr", fmt.Sprintf("%x", s.bzz.Hive.BaseAddr()))
- if self.ps != nil {
- self.ps.Start(srv)
+ if s.ps != nil {
+ s.ps.Start(srv)
}
// start swarm http proxy server
- if self.config.Port != "" {
- addr := net.JoinHostPort(self.config.ListenAddr, self.config.Port)
- server := httpapi.NewServer(self.api, self.config.Cors)
+ if s.config.Port != "" {
+ addr := net.JoinHostPort(s.config.ListenAddr, s.config.Port)
+ server := httpapi.NewServer(s.api, s.config.Cors)
- if self.config.Cors != "" {
- log.Debug("Swarm HTTP proxy CORS headers", "allowedOrigins", self.config.Cors)
+ if s.config.Cors != "" {
+ log.Debug("Swarm HTTP proxy CORS headers", "allowedOrigins", s.config.Cors)
}
- log.Debug("Starting Swarm HTTP proxy", "port", self.config.Port)
+ log.Debug("Starting Swarm HTTP proxy", "port", s.config.Port)
go func() {
err := server.ListenAndServe(addr)
if err != nil {
@@ -396,139 +397,158 @@ func (self *Swarm) Start(srv *p2p.Server) error {
}()
}
- self.periodicallyUpdateGauges()
-
- startCounter.Inc(1)
- self.streamer.Start(srv)
- return nil
-}
+ doneC := make(chan struct{})
-func (self *Swarm) periodicallyUpdateGauges() {
- ticker := time.NewTicker(updateGaugesPeriod)
+ s.cleanupFuncs = append(s.cleanupFuncs, func() error {
+ close(doneC)
+ return nil
+ })
- go func() {
- for range ticker.C {
- self.updateGauges()
+ go func(time.Time) {
+ for {
+ select {
+ case <-time.After(updateGaugesPeriod):
+ uptimeGauge.Update(time.Since(startTime).Nanoseconds())
+ requestsCacheGauge.Update(int64(s.netStore.RequestsCacheLen()))
+ case <-doneC:
+ return
+ }
}
- }()
-}
+ }(startTime)
-func (self *Swarm) updateGauges() {
- uptimeGauge.Update(time.Since(startTime).Nanoseconds())
- requestsCacheGauge.Update(int64(self.netStore.RequestsCacheLen()))
+ startCounter.Inc(1)
+ s.streamer.Start(srv)
+ return nil
}
// implements the node.Service interface
// stops all component services.
-func (self *Swarm) Stop() error {
- if self.tracerClose != nil {
- err := self.tracerClose.Close()
+func (s *Swarm) Stop() error {
+ if s.tracerClose != nil {
+ err := s.tracerClose.Close()
if err != nil {
return err
}
}
- if self.ps != nil {
- self.ps.Stop()
+ if s.ps != nil {
+ s.ps.Stop()
}
- if ch := self.config.Swap.Chequebook(); ch != nil {
+ if ch := s.config.Swap.Chequebook(); ch != nil {
ch.Stop()
ch.Save()
}
- if self.swap != nil {
- self.swap.Close()
+ if s.swap != nil {
+ s.swap.Close()
}
- if self.accountingMetrics != nil {
- self.accountingMetrics.Close()
+ if s.accountingMetrics != nil {
+ s.accountingMetrics.Close()
}
- if self.netStore != nil {
- self.netStore.Close()
+ if s.netStore != nil {
+ s.netStore.Close()
}
- self.sfs.Stop()
+ s.sfs.Stop()
stopCounter.Inc(1)
- self.streamer.Stop()
+ s.streamer.Stop()
- err := self.bzz.Stop()
- if self.stateStore != nil {
- self.stateStore.Close()
+ err := s.bzz.Stop()
+ if s.stateStore != nil {
+ s.stateStore.Close()
+ }
+
+ for _, cleanF := range s.cleanupFuncs {
+ err = cleanF()
+ if err != nil {
+ log.Error("encountered an error while running cleanup function", "err", err)
+ break
+ }
}
return err
}
-// implements the node.Service interface
-func (self *Swarm) Protocols() (protos []p2p.Protocol) {
- protos = append(protos, self.bzz.Protocols()...)
+// Protocols implements the node.Service interface
+func (s *Swarm) Protocols() (protos []p2p.Protocol) {
+ if s.config.BootnodeMode {
+ protos = append(protos, s.bzz.Protocols()...)
+ } else {
+ protos = append(protos, s.bzz.Protocols()...)
- if self.ps != nil {
- protos = append(protos, self.ps.Protocols()...)
+ if s.ps != nil {
+ protos = append(protos, s.ps.Protocols()...)
+ }
}
return
}
// implements node.Service
// APIs returns the RPC API descriptors the Swarm implementation offers
-func (self *Swarm) APIs() []rpc.API {
+func (s *Swarm) APIs() []rpc.API {
apis := []rpc.API{
// public APIs
{
Namespace: "bzz",
Version: "3.0",
- Service: &Info{self.config, chequebook.ContractParams},
+ Service: &Info{s.config, chequebook.ContractParams},
Public: true,
},
// admin APIs
{
Namespace: "bzz",
Version: "3.0",
- Service: api.NewControl(self.api, self.bzz.Hive),
+ Service: api.NewInspector(s.api, s.bzz.Hive, s.netStore),
Public: false,
},
{
Namespace: "chequebook",
Version: chequebook.Version,
- Service: chequebook.NewApi(self.config.Swap.Chequebook),
+ Service: chequebook.NewAPI(s.config.Swap.Chequebook),
Public: false,
},
{
Namespace: "swarmfs",
Version: fuse.Swarmfs_Version,
- Service: self.sfs,
+ Service: s.sfs,
Public: false,
},
{
Namespace: "accounting",
Version: protocols.AccountingVersion,
- Service: protocols.NewAccountingApi(self.accountingMetrics),
+ Service: protocols.NewAccountingApi(s.accountingMetrics),
Public: false,
},
}
- apis = append(apis, self.bzz.APIs()...)
+ apis = append(apis, s.bzz.APIs()...)
- if self.ps != nil {
- apis = append(apis, self.ps.APIs()...)
+ if s.ps != nil {
+ apis = append(apis, s.ps.APIs()...)
}
return apis
}
// SetChequebook ensures that the local checquebook is set up on chain.
-func (self *Swarm) SetChequebook(ctx context.Context) error {
- err := self.config.Swap.SetChequebook(ctx, self.backend, self.config.Path)
+func (s *Swarm) SetChequebook(ctx context.Context) error {
+ err := s.config.Swap.SetChequebook(ctx, s.backend, s.config.Path)
if err != nil {
return err
}
- log.Info(fmt.Sprintf("new chequebook set (%v): saving config file, resetting all connections in the hive", self.config.Swap.Contract.Hex()))
+ log.Info(fmt.Sprintf("new chequebook set (%v): saving config file, resetting all connections in the hive", s.config.Swap.Contract.Hex()))
return nil
}
+// RegisterPssProtocol adds a devp2p protocol to the swarm node's Pss instance
+func (s *Swarm) RegisterPssProtocol(topic *pss.Topic, spec *protocols.Spec, targetprotocol *p2p.Protocol, options *pss.ProtocolParams) (*pss.Protocol, error) {
+ return pss.RegisterProtocol(s.ps, topic, spec, targetprotocol, options)
+}
+
// serialisable info about swarm
type Info struct {
*api.Config
*chequebook.Params
}
-func (self *Info) Info() *Info {
- return self
+func (s *Info) Info() *Info {
+ return s
}