diff options
author | ethersphere <thesw@rm.eth> | 2018-06-20 20:06:27 +0800 |
---|---|---|
committer | ethersphere <thesw@rm.eth> | 2018-06-22 03:10:31 +0800 |
commit | e187711c6545487d4cac3701f0f506bb536234e2 (patch) | |
tree | d2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/swarm.go | |
parent | 574378edb50c907b532946a1d4654dbd6701b20a (diff) | |
download | go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2 go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.zip |
swarm: network rewrite merge
Diffstat (limited to 'swarm/swarm.go')
-rw-r--r-- | swarm/swarm.go | 340 |
1 files changed, 207 insertions, 133 deletions
diff --git a/swarm/swarm.go b/swarm/swarm.go index 0a120db1f..90360264e 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -23,6 +23,7 @@ import ( "fmt" "math/big" "net" + "path/filepath" "strings" "time" "unicode" @@ -31,20 +32,24 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/contracts/chequebook" "github.com/ethereum/go-ethereum/contracts/ens" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/api" httpapi "github.com/ethereum/go-ethereum/swarm/api/http" "github.com/ethereum/go-ethereum/swarm/fuse" + "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/network/stream" + "github.com/ethereum/go-ethereum/swarm/pss" + "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/storage/mock" + "github.com/ethereum/go-ethereum/swarm/storage/mru" ) var ( @@ -53,31 +58,28 @@ var ( startCounter = metrics.NewRegisteredCounter("stack,start", nil) stopCounter = metrics.NewRegisteredCounter("stack,stop", nil) uptimeGauge = metrics.NewRegisteredGauge("stack.uptime", nil) - dbSizeGauge = metrics.NewRegisteredGauge("storage.db.chunks.size", nil) - cacheSizeGauge = metrics.NewRegisteredGauge("storage.db.cache.size", nil) + requestsCacheGauge = metrics.NewRegisteredGauge("storage.cache.requests.size", nil) ) // the swarm stack type Swarm struct { - config *api.Config // swarm configuration - api *api.Api // high level api layer (fs/manifest) - dns api.Resolver // DNS registrar - dbAccess *network.DbAccess // access to local chunk db iterator and storage counter - storage storage.ChunkStore // internal access to storage, common interface to cloud storage backends - dpa *storage.DPA // distributed preimage archive, the local API to the storage with document level storage/retrieval support - depo network.StorageHandler // remote request handler, interface between bzz protocol and the storage - cloud storage.CloudStore // procurement, cloud storage backend (can multi-cloud) - hive *network.Hive // the logistic manager - backend chequebook.Backend // simple blockchain Backend + config *api.Config // swarm configuration + api *api.API // high level api layer (fs/manifest) + dns api.Resolver // DNS registrar + fileStore *storage.FileStore // distributed preimage archive, the local API to the storage with document level storage/retrieval support + streamer *stream.Registry + bzz *network.Bzz // the logistic manager + backend chequebook.Backend // simple blockchain Backend privateKey *ecdsa.PrivateKey corsString string swapEnabled bool lstore *storage.LocalStore // local store, needs to store for releasing resources after node stopped sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit + ps *pss.Pss } type SwarmAPI struct { - Api *api.Api + Api *api.API Backend chequebook.Backend PrvKey *ecdsa.PrivateKey } @@ -92,77 +94,147 @@ func (self *Swarm) API() *SwarmAPI { // creates a new swarm service instance // implements node.Service -func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api.Config) (self *Swarm, err error) { - if bytes.Equal(common.FromHex(config.PublicKey), storage.ZeroKey) { +// If mockStore is not nil, it will be used as the storage for chunk data. +// MockStore should be used only for testing. +func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err error) { + + if bytes.Equal(common.FromHex(config.PublicKey), storage.ZeroAddr) { return nil, fmt.Errorf("empty public key") } - if bytes.Equal(common.FromHex(config.BzzKey), storage.ZeroKey) { + if bytes.Equal(common.FromHex(config.BzzKey), storage.ZeroAddr) { return nil, fmt.Errorf("empty bzz key") } - self = &Swarm{ - config: config, - swapEnabled: config.SwapEnabled, - backend: backend, - privateKey: config.Swap.PrivateKey(), - corsString: config.Cors, + var backend chequebook.Backend + if config.SwapAPI != "" && config.SwapEnabled { + log.Info("connecting to SWAP API", "url", config.SwapAPI) + backend, err = ethclient.Dial(config.SwapAPI) + if err != nil { + return nil, fmt.Errorf("error connecting to SWAP API %s: %s", config.SwapAPI, err) + } } - log.Debug(fmt.Sprintf("Setting up Swarm service components")) - hash := storage.MakeHashFunc(config.ChunkerParams.Hash) - self.lstore, err = storage.NewLocalStore(hash, config.StoreParams) - if err != nil { - return + self = &Swarm{ + config: config, + backend: backend, + privateKey: config.ShiftPrivateKey(), } + log.Debug(fmt.Sprintf("Setting up Swarm service components")) - // setup local store - log.Debug(fmt.Sprintf("Set up local storage")) - - self.dbAccess = network.NewDbAccess(self.lstore) - log.Debug(fmt.Sprintf("Set up local db access (iterator/counter)")) - - // set up the kademlia hive - self.hive = network.NewHive( - common.HexToHash(self.config.BzzKey), // key to hive (kademlia base address) - config.HiveParams, // configuration parameters - config.SwapEnabled, // SWAP enabled - config.SyncEnabled, // syncronisation enabled - ) - log.Debug(fmt.Sprintf("Set up swarm network with Kademlia hive")) - - // setup cloud storage backend - self.cloud = network.NewForwarder(self.hive) - log.Debug(fmt.Sprintf("-> set swarm forwarder as cloud storage backend")) + config.HiveParams.Discovery = true - // setup cloud storage internal access layer - self.storage = storage.NewNetStore(hash, self.lstore, self.cloud, config.StoreParams) log.Debug(fmt.Sprintf("-> swarm net store shared access layer to Swarm Chunk Store")) - // set up Depo (storage handler = cloud storage access layer for incoming remote requests) - self.depo = network.NewDepo(hash, self.lstore, self.storage) - log.Debug(fmt.Sprintf("-> REmote Access to CHunks")) + nodeID, err := discover.HexID(config.NodeID) + if err != nil { + return nil, err + } + addr := &network.BzzAddr{ + OAddr: common.FromHex(config.BzzKey), + UAddr: []byte(discover.NewNode(nodeID, net.IP{127, 0, 0, 1}, 30303, 30303).String()), + } + + bzzconfig := &network.BzzConfig{ + NetworkID: config.NetworkID, + OverlayAddr: addr.OAddr, + UnderlayAddr: addr.UAddr, + HiveParams: config.HiveParams, + } - // set up DPA, the cloud storage local access layer - dpaChunkStore := storage.NewDpaChunkStore(self.lstore, self.storage) - log.Debug(fmt.Sprintf("-> Local Access to Swarm")) - // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage - self.dpa = storage.NewDPA(dpaChunkStore, self.config.ChunkerParams) - log.Debug(fmt.Sprintf("-> Content Store API")) + stateStore, err := state.NewDBStore(filepath.Join(config.Path, "state-store.db")) + if err != nil { + return + } + // set up high level api + var resolver *api.MultiResolver if len(config.EnsAPIs) > 0 { opts := []api.MultiResolverOption{} for _, c := range config.EnsAPIs { tld, endpoint, addr := parseEnsAPIAddress(c) - r, err := newEnsClient(endpoint, addr, config) + r, err := newEnsClient(endpoint, addr, config, self.privateKey) if err != nil { return nil, err } opts = append(opts, api.MultiResolverOptionWithResolver(r, tld)) + } - self.dns = api.NewMultiResolver(opts...) + resolver = api.NewMultiResolver(opts...) + self.dns = resolver + } + + self.lstore, err = storage.NewLocalStore(config.LocalStoreParams, mockStore) + if err != nil { + return + } + + db := storage.NewDBAPI(self.lstore) + to := network.NewKademlia( + common.FromHex(config.BzzKey), + network.NewKadParams(), + ) + delivery := stream.NewDelivery(to, db) + + self.streamer = stream.NewRegistry(addr, delivery, db, stateStore, &stream.RegistryOptions{ + SkipCheck: config.DeliverySkipCheck, + DoSync: config.SyncEnabled, + DoRetrieve: true, + SyncUpdateDelay: config.SyncUpdateDelay, + }) + + // set up NetStore, the cloud storage local access layer + netStore := storage.NewNetStore(self.lstore, self.streamer.Retrieve) + // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage + self.fileStore = storage.NewFileStore(netStore, self.config.FileStoreParams) + + var resourceHandler *mru.Handler + rhparams := &mru.HandlerParams{ + // TODO: config parameter to set limits + QueryMaxPeriods: &mru.LookupParams{ + Limit: false, + }, + Signer: &mru.GenericSigner{ + PrivKey: self.privateKey, + }, + } + if resolver != nil { + resolver.SetNameHash(ens.EnsNode) + // Set HeaderGetter and OwnerValidator interfaces to resolver only if it is not nil. + rhparams.HeaderGetter = resolver + rhparams.OwnerValidator = resolver + } else { + log.Warn("No ETH API specified, resource updates will use block height approximation") + // TODO: blockestimator should use saved values derived from last time ethclient was connected + rhparams.HeaderGetter = mru.NewBlockEstimator() + } + resourceHandler, err = mru.NewHandler(rhparams) + if err != nil { + return nil, err + } + resourceHandler.SetStore(netStore) + + var validators []storage.ChunkValidator + validators = append(validators, storage.NewContentAddressValidator(storage.MakeHashFunc(storage.DefaultHash))) + if resourceHandler != nil { + validators = append(validators, resourceHandler) + } + self.lstore.Validators = validators + + // setup local store + log.Debug(fmt.Sprintf("Set up local storage")) + + self.bzz = network.NewBzz(bzzconfig, to, stateStore, stream.Spec, self.streamer.Run) + + // Pss = postal service over swarm (devp2p over bzz) + self.ps, err = pss.NewPss(to, config.Pss) + if err != nil { + return nil, err + } + if pss.IsActiveHandshake { + pss.SetHandshakeController(self.ps, pss.NewHandshakeParams()) } - self.api = api.NewApi(self.dpa, self.dns) + self.api = api.NewAPI(self.fileStore, self.dns, resourceHandler) // Manifests for Smart Hosting log.Debug(fmt.Sprintf("-> Web3 virtual server API")) @@ -198,16 +270,22 @@ func parseEnsAPIAddress(s string) (tld, endpoint string, addr common.Address) { return } +// ensClient provides functionality for api.ResolveValidator +type ensClient struct { + *ens.ENS + *ethclient.Client +} + // newEnsClient creates a new ENS client for that is a consumer of // a ENS API on a specific endpoint. It is used as a helper function // for creating multiple resolvers in NewSwarm function. -func newEnsClient(endpoint string, addr common.Address, config *api.Config) (*ens.ENS, error) { +func newEnsClient(endpoint string, addr common.Address, config *api.Config, privkey *ecdsa.PrivateKey) (*ensClient, error) { log.Info("connecting to ENS API", "url", endpoint) client, err := rpc.Dial(endpoint) if err != nil { return nil, fmt.Errorf("error connecting to ENS API %s: %s", endpoint, err) } - ensClient := ethclient.NewClient(client) + ethClient := ethclient.NewClient(client) ensRoot := config.EnsRoot if addr != (common.Address{}) { @@ -220,13 +298,16 @@ func newEnsClient(endpoint string, addr common.Address, config *api.Config) (*en log.Warn(fmt.Sprintf("could not determine ENS contract address, using default %s", ensRoot), "err", err) } } - transactOpts := bind.NewKeyedTransactor(config.Swap.PrivateKey()) - dns, err := ens.NewENS(transactOpts, ensRoot, ensClient) + transactOpts := bind.NewKeyedTransactor(privkey) + dns, err := ens.NewENS(transactOpts, ensRoot, ethClient) if err != nil { return nil, err } log.Debug(fmt.Sprintf("-> Swarm Domain Name Registrar %v @ address %v", endpoint, ensRoot.Hex())) - return dns, err + return &ensClient{ + ENS: dns, + Client: ethClient, + }, err } // detectEnsAddr determines the ENS contract address by getting both the @@ -274,16 +355,12 @@ Start is called when the stack is started // implements the node.Service interface func (self *Swarm) Start(srv *p2p.Server) error { startTime = time.Now() - connectPeer := func(url string) error { - node, err := discover.ParseNode(url) - if err != nil { - return fmt.Errorf("invalid node URL: %v", err) - } - srv.AddPeer(node) - return nil - } + + // update uaddr to correct enode + newaddr := self.bzz.UpdateLocalAddr([]byte(srv.Self().String())) + log.Warn("Updated bzz local addr", "oaddr", fmt.Sprintf("%x", newaddr.OAddr), "uaddr", fmt.Sprintf("%s", newaddr.UAddr)) // set chequebook - if self.swapEnabled { + if self.config.SwapEnabled { ctx := context.Background() // The initial setup has no deadline. err := self.SetChequebook(ctx) if err != nil { @@ -295,33 +372,38 @@ func (self *Swarm) Start(srv *p2p.Server) error { } log.Warn(fmt.Sprintf("Starting Swarm service")) - self.hive.Start( - discover.PubkeyID(&srv.PrivateKey.PublicKey), - func() string { return srv.ListenAddr }, - connectPeer, - ) - log.Info(fmt.Sprintf("Swarm network started on bzz address: %v", self.hive.Addr())) - self.dpa.Start() - log.Debug(fmt.Sprintf("Swarm DPA started")) + err := self.bzz.Start(srv) + if err != nil { + log.Error("bzz failed", "err", err) + return err + } + log.Info(fmt.Sprintf("Swarm network started on bzz address: %x", self.bzz.Hive.Overlay.BaseAddr())) + + if self.ps != nil { + self.ps.Start(srv) + log.Info("Pss started") + } // start swarm http proxy server if self.config.Port != "" { addr := net.JoinHostPort(self.config.ListenAddr, self.config.Port) - go httpapi.StartHttpServer(self.api, &httpapi.ServerConfig{ + go httpapi.StartHTTPServer(self.api, &httpapi.ServerConfig{ Addr: addr, - CorsString: self.corsString, + CorsString: self.config.Cors, }) - log.Info(fmt.Sprintf("Swarm http proxy started on %v", addr)) + } - if self.corsString != "" { - log.Debug(fmt.Sprintf("Swarm http proxy started with corsdomain: %v", self.corsString)) - } + log.Debug(fmt.Sprintf("Swarm http proxy started on port: %v", self.config.Port)) + + if self.config.Cors != "" { + log.Debug(fmt.Sprintf("Swarm http proxy started with corsdomain: %v", self.config.Cors)) } self.periodicallyUpdateGauges() startCounter.Inc(1) + self.streamer.Start(srv) return nil } @@ -336,16 +418,16 @@ func (self *Swarm) periodicallyUpdateGauges() { } func (self *Swarm) updateGauges() { - dbSizeGauge.Update(int64(self.lstore.DbCounter())) - cacheSizeGauge.Update(int64(self.lstore.CacheCounter())) uptimeGauge.Update(time.Since(startTime).Nanoseconds()) + requestsCacheGauge.Update(int64(self.lstore.RequestsCacheLen())) } // implements the node.Service interface // stops all component services. func (self *Swarm) Stop() error { - self.dpa.Stop() - err := self.hive.Stop() + if self.ps != nil { + self.ps.Stop() + } if ch := self.config.Swap.Chequebook(); ch != nil { ch.Stop() ch.Save() @@ -356,34 +438,45 @@ func (self *Swarm) Stop() error { } self.sfs.Stop() stopCounter.Inc(1) - return err + self.streamer.Stop() + return self.bzz.Stop() } // implements the node.Service interface -func (self *Swarm) Protocols() []p2p.Protocol { - proto, err := network.Bzz(self.depo, self.backend, self.hive, self.dbAccess, self.config.Swap, self.config.SyncParams, self.config.NetworkId) - if err != nil { - return nil +func (self *Swarm) Protocols() (protos []p2p.Protocol) { + protos = append(protos, self.bzz.Protocols()...) + + if self.ps != nil { + protos = append(protos, self.ps.Protocols()...) + } + return +} + +func (self *Swarm) RegisterPssProtocol(spec *protocols.Spec, targetprotocol *p2p.Protocol, options *pss.ProtocolParams) (*pss.Protocol, error) { + if !pss.IsActiveProtocol { + return nil, fmt.Errorf("Pss protocols not available (built with !nopssprotocol tag)") } - return []p2p.Protocol{proto} + topic := pss.ProtocolTopic(spec) + return pss.RegisterProtocol(self.ps, &topic, spec, targetprotocol, options) } // implements node.Service -// Apis returns the RPC Api descriptors the Swarm implementation offers +// APIs returns the RPC API descriptors the Swarm implementation offers func (self *Swarm) APIs() []rpc.API { - return []rpc.API{ + + apis := []rpc.API{ // public APIs { Namespace: "bzz", - Version: "0.1", + Version: "3.0", Service: &Info{self.config, chequebook.ContractParams}, Public: true, }, // admin APIs { Namespace: "bzz", - Version: "0.1", - Service: api.NewControl(self.api, self.hive), + Version: "3.0", + Service: api.NewControl(self.api, self.bzz.Hive), Public: false, }, { @@ -414,9 +507,17 @@ func (self *Swarm) APIs() []rpc.API { }, // {Namespace, Version, api.NewAdmin(self), false}, } + + apis = append(apis, self.bzz.APIs()...) + + if self.ps != nil { + apis = append(apis, self.ps.APIs()...) + } + + return apis } -func (self *Swarm) Api() *api.Api { +func (self *Swarm) Api() *api.API { return self.api } @@ -427,36 +528,9 @@ func (self *Swarm) SetChequebook(ctx context.Context) error { return err } log.Info(fmt.Sprintf("new chequebook set (%v): saving config file, resetting all connections in the hive", self.config.Swap.Contract.Hex())) - self.hive.DropAll() return nil } -// Local swarm without netStore -func NewLocalSwarm(datadir, port string) (self *Swarm, err error) { - - prvKey, err := crypto.GenerateKey() - if err != nil { - return - } - - config := api.NewDefaultConfig() - config.Path = datadir - config.Init(prvKey) - config.Port = port - - dpa, err := storage.NewLocalDPA(datadir) - if err != nil { - return - } - - self = &Swarm{ - api: api.NewApi(dpa, nil), - config: config, - } - - return -} - // serialisable info about swarm type Info struct { *api.Config |