From 3e1000fda3424d880bc43ebbb16d8a33447d4182 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 26 Nov 2015 18:35:44 +0200 Subject: cmd, eth, node, rpc, xeth: use single-instance services --- cmd/geth/js.go | 4 +- cmd/geth/js_test.go | 14 +-- cmd/geth/main.go | 2 +- cmd/gethrpctest/main.go | 6 +- cmd/utils/flags.go | 10 +- eth/backend.go | 4 +- node/errors.go | 20 +++- node/node.go | 126 ++++++++----------------- node/node_example_test.go | 2 +- node/node_test.go | 228 ++++++++++++++++++++-------------------------- node/service.go | 38 +++----- node/service_test.go | 37 +++----- node/utils_test.go | 117 ++++++++++++++++++++++++ rpc/api/admin.go | 2 +- rpc/api/utils.go | 2 +- xeth/state.go | 3 +- xeth/xeth.go | 8 +- 17 files changed, 329 insertions(+), 294 deletions(-) create mode 100644 node/utils_test.go diff --git a/cmd/geth/js.go b/cmd/geth/js.go index a0e8bdb21..f1845d94f 100644 --- a/cmd/geth/js.go +++ b/cmd/geth/js.go @@ -345,7 +345,7 @@ func (self *jsre) AskPassword() (string, bool) { func (self *jsre) ConfirmTransaction(tx string) bool { // Retrieve the Ethereum instance from the node var ethereum *eth.Ethereum - if _, err := self.stack.SingletonService(ðereum); err != nil { + if err := self.stack.Service(ðereum); err != nil { return false } // If natspec is enabled, ask for permission @@ -367,7 +367,7 @@ func (self *jsre) UnlockAccount(addr []byte) bool { } // TODO: allow retry var ethereum *eth.Ethereum - if _, err := self.stack.SingletonService(ðereum); err != nil { + if err := self.stack.Service(ðereum); err != nil { return false } if err := ethereum.AccountManager().Unlock(common.BytesToAddress(addr), pass); err != nil { diff --git a/cmd/geth/js_test.go b/cmd/geth/js_test.go index ed4d04b48..a0f3f2fb7 100644 --- a/cmd/geth/js_test.go +++ b/cmd/geth/js_test.go @@ -68,7 +68,7 @@ type testjethre struct { func (self *testjethre) UnlockAccount(acc []byte) bool { var ethereum *eth.Ethereum - self.stack.SingletonService(ðereum) + self.stack.Service(ðereum) err := ethereum.AccountManager().Unlock(common.BytesToAddress(acc), "") if err != nil { @@ -79,7 +79,7 @@ func (self *testjethre) UnlockAccount(acc []byte) bool { func (self *testjethre) ConfirmTransaction(tx string) bool { var ethereum *eth.Ethereum - self.stack.SingletonService(ðereum) + self.stack.Service(ðereum) if ethereum.NatSpec { self.lastConfirm = natspec.GetNotice(self.xeth, tx, self.client) @@ -118,7 +118,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod if config != nil { config(ethConf) } - if err := stack.Register("ethereum", func(ctx *node.ServiceContext) (node.Service, error) { return eth.New(ctx, ethConf) }); err != nil { + if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { return eth.New(ctx, ethConf) }); err != nil { t.Fatalf("failed to register ethereum protocol: %v", err) } // Initialize all the keys for testing @@ -138,7 +138,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod t.Fatalf("failed to start test stack: %v", err) } var ethereum *eth.Ethereum - stack.SingletonService(ðereum) + stack.Service(ðereum) assetPath := filepath.Join(os.Getenv("GOPATH"), "src", "github.com", "ethereum", "go-ethereum", "cmd", "mist", "assets", "ext") client := comms.NewInProcClient(codec.JSON) @@ -202,7 +202,7 @@ func TestBlockChain(t *testing.T) { tmpfileq := strconv.Quote(tmpfile) var ethereum *eth.Ethereum - node.SingletonService(ðereum) + node.Service(ðereum) ethereum.BlockChain().Reset() checkEvalJSON(t, repl, `admin.exportChain(`+tmpfileq+`)`, `true`) @@ -436,7 +436,7 @@ multiply7 = Multiply7.at(contractaddress); func pendingTransactions(repl *testjethre, t *testing.T) (txc int64, err error) { var ethereum *eth.Ethereum - repl.stack.SingletonService(ðereum) + repl.stack.Service(ðereum) txs := ethereum.TxPool().GetTransactions() return int64(len(txs)), nil @@ -464,7 +464,7 @@ func processTxs(repl *testjethre, t *testing.T, expTxc int) bool { return false } var ethereum *eth.Ethereum - repl.stack.SingletonService(ðereum) + repl.stack.Service(ðereum) err = ethereum.StartMining(runtime.NumCPU(), "") if err != nil { diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 6fac4f458..3a5471845 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -489,7 +489,7 @@ func startNode(ctx *cli.Context, stack *node.Node) { // Unlock any account specifically requested var ethereum *eth.Ethereum - if _, err := stack.SingletonService(ðereum); err != nil { + if err := stack.Service(ðereum); err != nil { utils.Fatalf("ethereum service not running: %v", err) } accman := ethereum.AccountManager() diff --git a/cmd/gethrpctest/main.go b/cmd/gethrpctest/main.go index cb4c7aece..7130980ac 100644 --- a/cmd/gethrpctest/main.go +++ b/cmd/gethrpctest/main.go @@ -126,11 +126,11 @@ func MakeSystemNode(keydir string, privkey string, test *tests.BlockTest) (*node TestGenesisBlock: test.Genesis, AccountManager: accman, } - if err := stack.Register("ethereum", func(ctx *node.ServiceContext) (node.Service, error) { return eth.New(ctx, ethConf) }); err != nil { + if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { return eth.New(ctx, ethConf) }); err != nil { return nil, err } // Initialize and register the Whisper protocol - if err := stack.Register("whisper", func(*node.ServiceContext) (node.Service, error) { return whisper.New(), nil }); err != nil { + if err := stack.Register(func(*node.ServiceContext) (node.Service, error) { return whisper.New(), nil }); err != nil { return nil, err } return stack, nil @@ -140,7 +140,7 @@ func MakeSystemNode(keydir string, privkey string, test *tests.BlockTest) (*node // stack to ensure basic checks pass before running RPC tests. func RunTest(stack *node.Node, test *tests.BlockTest) error { var ethereum *eth.Ethereum - stack.SingletonService(ðereum) + stack.Service(ðereum) blockchain := ethereum.BlockChain() // Process the blocks and verify the imported headers diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 30570d930..53126f9e5 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -686,13 +686,13 @@ func MakeSystemNode(name, version string, extra []byte, ctx *cli.Context) *node. if err != nil { Fatalf("Failed to create the protocol stack: %v", err) } - if err := stack.Register("eth", func(ctx *node.ServiceContext) (node.Service, error) { + if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { return eth.New(ctx, ethConf) }); err != nil { Fatalf("Failed to register the Ethereum service: %v", err) } if shhEnable { - if err := stack.Register("shh", func(*node.ServiceContext) (node.Service, error) { return whisper.New(), nil }); err != nil { + if err := stack.Register(func(*node.ServiceContext) (node.Service, error) { return whisper.New(), nil }); err != nil { Fatalf("Failed to register the Whisper service: %v", err) } } @@ -786,7 +786,11 @@ func StartIPC(stack *node.Node, ctx *cli.Context) error { } initializer := func(conn net.Conn) (comms.Stopper, shared.EthereumApi, error) { - fe := useragent.NewRemoteFrontend(conn, stack.Service("eth").(*eth.Ethereum).AccountManager()) + var ethereum *eth.Ethereum + if err := stack.Service(ðereum); err != nil { + return nil, nil, err + } + fe := useragent.NewRemoteFrontend(conn, ethereum.AccountManager()) xeth := xeth.New(stack, fe) apis, err := api.ParseApiString(ctx.GlobalString(IPCApiFlag.Name), codec.JSON, xeth, stack) if err != nil { diff --git a/eth/backend.go b/eth/backend.go index 5a8cf6a73..0369f6afd 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -135,7 +135,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { ethdb.OpenFileLimit = 128 / (dbCount + 1) // Open the chain database and perform any upgrades needed - chainDb, err := ctx.Database("chaindata", config.DatabaseCache) + chainDb, err := ctx.OpenDatabase("chaindata", config.DatabaseCache) if err != nil { return nil, err } @@ -149,7 +149,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { return nil, err } - dappDb, err := ctx.Database("dapp", config.DatabaseCache) + dappDb, err := ctx.OpenDatabase("dapp", config.DatabaseCache) if err != nil { return nil, err } diff --git a/node/errors.go b/node/errors.go index f8eece06d..bd5ddeb5d 100644 --- a/node/errors.go +++ b/node/errors.go @@ -16,13 +16,27 @@ package node -import "fmt" +import ( + "fmt" + "reflect" +) -// StopError is returned if a node fails to stop either any of its registered +// DuplicateServiceError is returned during Node startup if a registered service +// constructor returns a service of the same type that was already started. +type DuplicateServiceError struct { + Kind reflect.Type +} + +// Error generates a textual representation of the duplicate service error. +func (e *DuplicateServiceError) Error() string { + return fmt.Sprintf("duplicate service: %v", e.Kind) +} + +// StopError is returned if a Node fails to stop either any of its registered // services or itself. type StopError struct { Server error - Services map[string]error + Services map[reflect.Type]error } // Error generates a textual representation of the stop error. diff --git a/node/node.go b/node/node.go index 023f77403..5566bc44b 100644 --- a/node/node.go +++ b/node/node.go @@ -30,16 +30,16 @@ import ( ) var ( - ErrDatadirUsed = errors.New("datadir already used") - ErrNodeStopped = errors.New("node not started") - ErrNodeRunning = errors.New("node already running") - ErrServiceUnknown = errors.New("service not registered") - ErrServiceRegistered = errors.New("service already registered") + ErrDatadirUsed = errors.New("datadir already used") + ErrNodeStopped = errors.New("node not started") + ErrNodeRunning = errors.New("node already running") + ErrServiceUnknown = errors.New("unknown service") datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true} ) -// Node represents a P2P node into which arbitrary services might be registered. +// Node represents a P2P node into which arbitrary (uniquely typed) services might +// be registered. type Node struct { datadir string // Path to the currently used data directory eventmux *event.TypeMux // Event multiplexer used between the services of a stack @@ -47,9 +47,8 @@ type Node struct { serverConfig *p2p.Server // Configuration of the underlying P2P networking layer server *p2p.Server // Currently running P2P networking layer - serviceIndex map[string]ServiceConstructor // Set of services currently registered in the node - serviceOrder []string // Service construction order to handle dependencies - services map[string]Service // Currently running services + serviceFuncs []ServiceConstructor // Service constructors (in dependency order) + services map[reflect.Type]Service // Currently running services stop chan struct{} // Channel to wait for termination notifications lock sync.RWMutex @@ -85,52 +84,21 @@ func New(conf *Config) (*Node, error) { MaxPeers: conf.MaxPeers, MaxPendingPeers: conf.MaxPendingPeers, }, - serviceIndex: make(map[string]ServiceConstructor), - serviceOrder: []string{}, + serviceFuncs: []ServiceConstructor{}, eventmux: new(event.TypeMux), }, nil } -// Register injects a new service into the node's stack. -func (n *Node) Register(id string, constructor ServiceConstructor) error { +// Register injects a new service into the node's stack. The service created by +// the passed constructor must be unique in its type with regard to sibling ones. +func (n *Node) Register(constructor ServiceConstructor) error { n.lock.Lock() defer n.lock.Unlock() - // Short circuit if the node is running or if the id is taken if n.server != nil { return ErrNodeRunning } - if _, ok := n.serviceIndex[id]; ok { - return ErrServiceRegistered - } - // Otherwise register the service and return - n.serviceOrder = append(n.serviceOrder, id) - n.serviceIndex[id] = constructor - - return nil -} - -// Unregister removes a service from a node's stack. If the node is currently -// running, an error will be returned. -func (n *Node) Unregister(id string) error { - n.lock.Lock() - defer n.lock.Unlock() - - // Short circuit if the node is running, or if the service is unknown - if n.server != nil { - return ErrNodeRunning - } - if _, ok := n.serviceIndex[id]; !ok { - return ErrServiceUnknown - } - // Otherwise drop the service and return - delete(n.serviceIndex, id) - for i, service := range n.serviceOrder { - if service == id { - n.serviceOrder = append(n.serviceOrder[:i], n.serviceOrder[i+1:]...) - break - } - } + n.serviceFuncs = append(n.serviceFuncs, constructor) return nil } @@ -147,25 +115,27 @@ func (n *Node) Start() error { running := new(p2p.Server) *running = *n.serverConfig - services := make(map[string]Service) - for _, id := range n.serviceOrder { - constructor := n.serviceIndex[id] - + services := make(map[reflect.Type]Service) + for _, constructor := range n.serviceFuncs { // Create a new context for the particular service ctx := &ServiceContext{ datadir: n.datadir, - services: make(map[string]Service), + services: make(map[reflect.Type]Service), EventMux: n.eventmux, } - for id, s := range services { // copy needed for threaded access - ctx.services[id] = s + for kind, s := range services { // copy needed for threaded access + ctx.services[kind] = s } // Construct and save the service service, err := constructor(ctx) if err != nil { return err } - services[id] = service + kind := reflect.TypeOf(service) + if _, exists := services[kind]; exists { + return &DuplicateServiceError{Kind: kind} + } + services[kind] = service } // Gather the protocols and start the freshly assembled P2P server for _, service := range services { @@ -178,19 +148,19 @@ func (n *Node) Start() error { return err } // Start each of the services - started := []string{} - for id, service := range services { + started := []reflect.Type{} + for kind, service := range services { // Start the next service, stopping all previous upon failure if err := service.Start(running); err != nil { - for _, id := range started { - services[id].Stop() + for _, kind := range started { + services[kind].Stop() } running.Stop() return err } // Mark the service started for potential cleanup - started = append(started, id) + started = append(started, kind) } // Finish initializing the startup n.services = services @@ -212,11 +182,11 @@ func (n *Node) Stop() error { } // Otherwise terminate all the services and the P2P server too failure := &StopError{ - Services: make(map[string]error), + Services: make(map[reflect.Type]error), } - for id, service := range n.services { + for kind, service := range n.services { if err := service.Stop(); err != nil { - failure.Services[id] = err + failure.Services[kind] = err } } n.server.Stop() @@ -266,40 +236,22 @@ func (n *Node) Server() *p2p.Server { return n.server } -// Service retrieves a currently running service registered under a given id. -func (n *Node) Service(id string) Service { - n.lock.RLock() - defer n.lock.RUnlock() - - // Short circuit if the node's not running - if n.server == nil { - return nil - } - return n.services[id] -} - -// SingletonService retrieves a currently running service using a specific type -// implementing the Service interface. This is a utility function for scenarios -// where it is known that only one instance of a given service type is running, -// allowing to access services without needing to know their specific id with -// which they were registered. Note, this method uses reflection, so do not run -// in a tight loop. -func (n *Node) SingletonService(service interface{}) (string, error) { +// Service retrieves a currently running service registered of a specific type. +func (n *Node) Service(service interface{}) error { n.lock.RLock() defer n.lock.RUnlock() // Short circuit if the node's not running if n.server == nil { - return "", ErrServiceUnknown + return ErrNodeStopped } // Otherwise try to find the service to return - for id, running := range n.services { - if reflect.TypeOf(running) == reflect.ValueOf(service).Elem().Type() { - reflect.ValueOf(service).Elem().Set(reflect.ValueOf(running)) - return id, nil - } + element := reflect.ValueOf(service).Elem() + if running, ok := n.services[element.Type()]; ok { + element.Set(reflect.ValueOf(running)) + return nil } - return "", ErrServiceUnknown + return ErrServiceUnknown } // DataDir retrieves the current datadir used by the protocol stack. diff --git a/node/node_example_test.go b/node/node_example_test.go index 898d4f5bc..2f9b49a56 100644 --- a/node/node_example_test.go +++ b/node/node_example_test.go @@ -66,7 +66,7 @@ func ExampleUsage() { constructor := func(context *node.ServiceContext) (node.Service, error) { return new(SampleService), nil } - if err := stack.Register("my sample service", constructor); err != nil { + if err := stack.Register(constructor); err != nil { log.Fatalf("Failed to register service: %v", err) } // Boot up the entire protocol stack, do a restart and terminate diff --git a/node/node_test.go b/node/node_test.go index 1d5570e42..ef096fc91 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -20,6 +20,7 @@ import ( "errors" "io/ioutil" "os" + "reflect" "testing" "github.com/ethereum/go-ethereum/crypto" @@ -98,79 +99,36 @@ func TestNodeUsedDataDir(t *testing.T) { } } -// NoopService is a trivial implementation of the Service interface. -type NoopService struct{} - -func (s *NoopService) Protocols() []p2p.Protocol { return nil } -func (s *NoopService) Start(*p2p.Server) error { return nil } -func (s *NoopService) Stop() error { return nil } - -func NewNoopService(*ServiceContext) (Service, error) { return new(NoopService), nil } - -// Tests whether services can be registered and unregistered. +// Tests whether services can be registered and duplicates caught. func TestServiceRegistry(t *testing.T) { stack, err := New(testNodeConfig) if err != nil { t.Fatalf("failed to create protocol stack: %v", err) } - // Create a batch of dummy services and ensure they don't exist - ids := []string{"A", "B", "C"} - for i, id := range ids { - if err := stack.Unregister(id); err != ErrServiceUnknown { - t.Fatalf("service %d: pre-unregistration failure mismatch: have %v, want %v", i, err, ErrServiceUnknown) + // Register a batch of unique services and ensure they start successfully + services := []ServiceConstructor{NewNoopServiceA, NewNoopServiceB, NewNoopServiceC} + for i, constructor := range services { + if err := stack.Register(constructor); err != nil { + t.Fatalf("service #%d: registration failed: %v", i, err) } } - // Register the services, checking that the operation succeeds only once - for i, id := range ids { - if err := stack.Register(id, NewNoopService); err != nil { - t.Fatalf("service %d: registration failed: %v", i, err) - } - if err := stack.Register(id, NewNoopService); err != ErrServiceRegistered { - t.Fatalf("service %d: registration failure mismatch: have %v, want %v", i, err, ErrServiceRegistered) - } - } - // Unregister the services, checking that the operation succeeds only once - for i, id := range ids { - if err := stack.Unregister(id); err != nil { - t.Fatalf("service %d: unregistration failed: %v", i, err) - } - if err := stack.Unregister(id); err != ErrServiceUnknown { - t.Fatalf("service %d: unregistration failure mismatch: have %v, want %v", i, err, ErrServiceUnknown) - } + if err := stack.Start(); err != nil { + t.Fatalf("failed to start original service stack: %v", err) } -} - -// InstrumentedService is an implementation of Service for which all interface -// methods can be instrumented both return value as well as event hook wise. -type InstrumentedService struct { - protocols []p2p.Protocol - start error - stop error - - protocolsHook func() - startHook func(*p2p.Server) - stopHook func() -} - -func (s *InstrumentedService) Protocols() []p2p.Protocol { - if s.protocolsHook != nil { - s.protocolsHook() + if err := stack.Stop(); err != nil { + t.Fatalf("failed to stop original service stack: %v", err) } - return s.protocols -} - -func (s *InstrumentedService) Start(server *p2p.Server) error { - if s.startHook != nil { - s.startHook(server) + // Duplicate one of the services and retry starting the node + if err := stack.Register(NewNoopServiceB); err != nil { + t.Fatalf("duplicate registration failed: %v", err) } - return s.start -} - -func (s *InstrumentedService) Stop() error { - if s.stopHook != nil { - s.stopHook() + if err := stack.Start(); err == nil { + t.Fatalf("duplicate service started") + } else { + if _, ok := err.(*DuplicateServiceError); !ok { + t.Fatalf("duplicate error mismatch: have %v, want %v", err, DuplicateServiceError{}) + } } - return s.stop } // Tests that registered services get started and stopped correctly. @@ -180,12 +138,15 @@ func TestServiceLifeCycle(t *testing.T) { t.Fatalf("failed to create protocol stack: %v", err) } // Register a batch of life-cycle instrumented services - ids := []string{"A", "B", "C"} - + services := map[string]InstrumentingWrapper{ + "A": InstrumentedServiceMakerA, + "B": InstrumentedServiceMakerB, + "C": InstrumentedServiceMakerC, + } started := make(map[string]bool) stopped := make(map[string]bool) - for i, id := range ids { + for id, maker := range services { id := id // Closure for the constructor constructor := func(*ServiceContext) (Service, error) { return &InstrumentedService{ @@ -193,35 +154,29 @@ func TestServiceLifeCycle(t *testing.T) { stopHook: func() { stopped[id] = true }, }, nil } - if err := stack.Register(id, constructor); err != nil { - t.Fatalf("service %d: registration failed: %v", i, err) + if err := stack.Register(maker(constructor)); err != nil { + t.Fatalf("service %s: registration failed: %v", id, err) } } // Start the node and check that all services are running if err := stack.Start(); err != nil { t.Fatalf("failed to start protocol stack: %v", err) } - for i, id := range ids { + for id, _ := range services { if !started[id] { - t.Fatalf("service %d: freshly started service not running", i) + t.Fatalf("service %s: freshly started service not running", id) } if stopped[id] { - t.Fatalf("service %d: freshly started service already stopped", i) - } - if stack.Service(id) == nil { - t.Fatalf("service %d: freshly started service unaccessible", i) + t.Fatalf("service %s: freshly started service already stopped", id) } } // Stop the node and check that all services have been stopped if err := stack.Stop(); err != nil { t.Fatalf("failed to stop protocol stack: %v", err) } - for i, id := range ids { + for id, _ := range services { if !stopped[id] { - t.Fatalf("service %d: freshly terminated service still running", i) - } - if service := stack.Service(id); service != nil { - t.Fatalf("service %d: freshly terminated service still accessible: %v", i, service) + t.Fatalf("service %s: freshly terminated service still running", id) } } } @@ -251,7 +206,7 @@ func TestServiceRestarts(t *testing.T) { }, nil } // Register the service and start the protocol stack - if err := stack.Register("service", constructor); err != nil { + if err := stack.Register(constructor); err != nil { t.Fatalf("failed to register the service: %v", err) } if err := stack.Start(); err != nil { @@ -281,18 +236,21 @@ func TestServiceConstructionAbortion(t *testing.T) { t.Fatalf("failed to create protocol stack: %v", err) } // Define a batch of good services - ids := []string{"A", "B", "C", "D", "E", "F"} - + services := map[string]InstrumentingWrapper{ + "A": InstrumentedServiceMakerA, + "B": InstrumentedServiceMakerB, + "C": InstrumentedServiceMakerC, + } started := make(map[string]bool) - for i, id := range ids { + for id, maker := range services { id := id // Closure for the constructor constructor := func(*ServiceContext) (Service, error) { return &InstrumentedService{ startHook: func(*p2p.Server) { started[id] = true }, }, nil } - if err := stack.Register(id, constructor); err != nil { - t.Fatalf("service %d: registration failed: %v", i, err) + if err := stack.Register(maker(constructor)); err != nil { + t.Fatalf("service %s: registration failed: %v", id, err) } } // Register a service that fails to construct itself @@ -300,7 +258,7 @@ func TestServiceConstructionAbortion(t *testing.T) { failer := func(*ServiceContext) (Service, error) { return nil, failure } - if err := stack.Register("failer", failer); err != nil { + if err := stack.Register(failer); err != nil { t.Fatalf("failer registration failed: %v", err) } // Start the protocol stack and ensure none of the services get started @@ -308,9 +266,9 @@ func TestServiceConstructionAbortion(t *testing.T) { if err := stack.Start(); err != failure { t.Fatalf("iter %d: stack startup failure mismatch: have %v, want %v", i, err, failure) } - for i, id := range ids { + for id, _ := range services { if started[id] { - t.Fatalf("service %d: started should not have", i) + t.Fatalf("service %s: started should not have", id) } delete(started, id) } @@ -325,12 +283,15 @@ func TestServiceStartupAbortion(t *testing.T) { t.Fatalf("failed to create protocol stack: %v", err) } // Register a batch of good services - ids := []string{"A", "B", "C", "D", "E", "F"} - + services := map[string]InstrumentingWrapper{ + "A": InstrumentedServiceMakerA, + "B": InstrumentedServiceMakerB, + "C": InstrumentedServiceMakerC, + } started := make(map[string]bool) stopped := make(map[string]bool) - for i, id := range ids { + for id, maker := range services { id := id // Closure for the constructor constructor := func(*ServiceContext) (Service, error) { return &InstrumentedService{ @@ -338,8 +299,8 @@ func TestServiceStartupAbortion(t *testing.T) { stopHook: func() { stopped[id] = true }, }, nil } - if err := stack.Register(id, constructor); err != nil { - t.Fatalf("service %d: registration failed: %v", i, err) + if err := stack.Register(maker(constructor)); err != nil { + t.Fatalf("service %s: registration failed: %v", id, err) } } // Register a service that fails to start @@ -349,7 +310,7 @@ func TestServiceStartupAbortion(t *testing.T) { start: failure, }, nil } - if err := stack.Register("failer", failer); err != nil { + if err := stack.Register(failer); err != nil { t.Fatalf("failer registration failed: %v", err) } // Start the protocol stack and ensure all started services stop @@ -357,9 +318,9 @@ func TestServiceStartupAbortion(t *testing.T) { if err := stack.Start(); err != failure { t.Fatalf("iter %d: stack startup failure mismatch: have %v, want %v", i, err, failure) } - for i, id := range ids { + for id, _ := range services { if started[id] && !stopped[id] { - t.Fatalf("service %d: started but not stopped", i) + t.Fatalf("service %s: started but not stopped", id) } delete(started, id) delete(stopped, id) @@ -375,12 +336,15 @@ func TestServiceTerminationGuarantee(t *testing.T) { t.Fatalf("failed to create protocol stack: %v", err) } // Register a batch of good services - ids := []string{"A", "B", "C", "D", "E", "F"} - + services := map[string]InstrumentingWrapper{ + "A": InstrumentedServiceMakerA, + "B": InstrumentedServiceMakerB, + "C": InstrumentedServiceMakerC, + } started := make(map[string]bool) stopped := make(map[string]bool) - for i, id := range ids { + for id, maker := range services { id := id // Closure for the constructor constructor := func(*ServiceContext) (Service, error) { return &InstrumentedService{ @@ -388,8 +352,8 @@ func TestServiceTerminationGuarantee(t *testing.T) { stopHook: func() { stopped[id] = true }, }, nil } - if err := stack.Register(id, constructor); err != nil { - t.Fatalf("service %d: registration failed: %v", i, err) + if err := stack.Register(maker(constructor)); err != nil { + t.Fatalf("service %s: registration failed: %v", id, err) } } // Register a service that fails to shot down cleanly @@ -399,7 +363,7 @@ func TestServiceTerminationGuarantee(t *testing.T) { stop: failure, }, nil } - if err := stack.Register("failer", failer); err != nil { + if err := stack.Register(failer); err != nil { t.Fatalf("failer registration failed: %v", err) } // Start the protocol stack, and ensure that a failing shut down terminates all @@ -408,12 +372,12 @@ func TestServiceTerminationGuarantee(t *testing.T) { if err := stack.Start(); err != nil { t.Fatalf("iter %d: failed to start protocol stack: %v", i, err) } - for j, id := range ids { + for id, _ := range services { if !started[id] { - t.Fatalf("iter %d, service %d: service not running", i, j) + t.Fatalf("iter %d, service %s: service not running", i, id) } if stopped[id] { - t.Fatalf("iter %d, service %d: service already stopped", i, j) + t.Fatalf("iter %d, service %s: service already stopped", i, id) } } // Stop the stack, verify failure and check all terminations @@ -421,16 +385,17 @@ func TestServiceTerminationGuarantee(t *testing.T) { if err, ok := err.(*StopError); !ok { t.Fatalf("iter %d: termination failure mismatch: have %v, want StopError", i, err) } else { - if err.Services["failer"] != failure { - t.Fatalf("iter %d: failer termination failure mismatch: have %v, want %v", i, err.Services["failer"], failure) + failer := reflect.TypeOf(&InstrumentedService{}) + if err.Services[failer] != failure { + t.Fatalf("iter %d: failer termination failure mismatch: have %v, want %v", i, err.Services[failer], failure) } if len(err.Services) != 1 { t.Fatalf("iter %d: failure count mismatch: have %d, want %d", i, len(err.Services), 1) } } - for j, id := range ids { + for id, _ := range services { if !stopped[id] { - t.Fatalf("iter %d, service %d: service not terminated", i, j) + t.Fatalf("iter %d, service %s: service not terminated", i, id) } delete(started, id) delete(stopped, id) @@ -438,27 +403,27 @@ func TestServiceTerminationGuarantee(t *testing.T) { } } -// TestSingletonServiceRetrieval tests that singleton services can be retrieved. -func TestSingletonServiceRetrieval(t *testing.T) { +// TestServiceRetrieval tests that individual services can be retrieved. +func TestServiceRetrieval(t *testing.T) { // Create a simple stack and register two service types stack, err := New(testNodeConfig) if err != nil { t.Fatalf("failed to create protocol stack: %v", err) } - if err := stack.Register("noop", func(*ServiceContext) (Service, error) { return new(NoopService), nil }); err != nil { + if err := stack.Register(NewNoopService); err != nil { t.Fatalf("noop service registration failed: %v", err) } - if err := stack.Register("instrumented", func(*ServiceContext) (Service, error) { return new(InstrumentedService), nil }); err != nil { + if err := stack.Register(NewInstrumentedService); err != nil { t.Fatalf("instrumented service registration failed: %v", err) } // Make sure none of the services can be retrieved until started var noopServ *NoopService - if id, err := stack.SingletonService(&noopServ); id != "" || err != ErrServiceUnknown { - t.Fatalf("noop service retrieval mismatch: have %v/%v, want %v/%v", id, err, "", ErrServiceUnknown) + if err := stack.Service(&noopServ); err != ErrNodeStopped { + t.Fatalf("noop service retrieval mismatch: have %v, want %v", err, ErrNodeStopped) } var instServ *InstrumentedService - if id, err := stack.SingletonService(&instServ); id != "" || err != ErrServiceUnknown { - t.Fatalf("instrumented service retrieval mismatch: have %v/%v, want %v/%v", id, err, "", ErrServiceUnknown) + if err := stack.Service(&instServ); err != ErrNodeStopped { + t.Fatalf("instrumented service retrieval mismatch: have %v, want %v", err, ErrNodeStopped) } // Start the stack and ensure everything is retrievable now if err := stack.Start(); err != nil { @@ -466,11 +431,11 @@ func TestSingletonServiceRetrieval(t *testing.T) { } defer stack.Stop() - if id, err := stack.SingletonService(&noopServ); id != "noop" || err != nil { - t.Fatalf("noop service retrieval mismatch: have %v/%v, want %v/%v", id, err, "noop", nil) + if err := stack.Service(&noopServ); err != nil { + t.Fatalf("noop service retrieval mismatch: have %v, want %v", err, nil) } - if id, err := stack.SingletonService(&instServ); id != "instrumented" || err != nil { - t.Fatalf("instrumented service retrieval mismatch: have %v/%v, want %v/%v", id, err, "instrumented", nil) + if err := stack.Service(&instServ); err != nil { + t.Fatalf("instrumented service retrieval mismatch: have %v, want %v", err, nil) } } @@ -481,13 +446,16 @@ func TestProtocolGather(t *testing.T) { t.Fatalf("failed to create protocol stack: %v", err) } // Register a batch of services with some configured number of protocols - services := map[string]int{ - "Zero Protocols": 0, - "Single Protocol": 1, - "Many Protocols": 25, - } - for id, count := range services { - protocols := make([]p2p.Protocol, count) + services := map[string]struct { + Count int + Maker InstrumentingWrapper + }{ + "Zero Protocols": {0, InstrumentedServiceMakerA}, + "Single Protocol": {1, InstrumentedServiceMakerB}, + "Many Protocols": {25, InstrumentedServiceMakerC}, + } + for id, config := range services { + protocols := make([]p2p.Protocol, config.Count) for i := 0; i < len(protocols); i++ { protocols[i].Name = id protocols[i].Version = uint(i) @@ -497,7 +465,7 @@ func TestProtocolGather(t *testing.T) { protocols: protocols, }, nil } - if err := stack.Register(id, constructor); err != nil { + if err := stack.Register(config.Maker(constructor)); err != nil { t.Fatalf("service %s: registration failed: %v", id, err) } } @@ -511,8 +479,8 @@ func TestProtocolGather(t *testing.T) { if len(protocols) != 26 { t.Fatalf("mismatching number of protocols launched: have %d, want %d", len(protocols), 26) } - for id, count := range services { - for ver := 0; ver < count; ver++ { + for id, config := range services { + for ver := 0; ver < config.Count; ver++ { launched := false for i := 0; i < len(protocols); i++ { if protocols[i].Name == id && protocols[i].Version == uint(ver) { diff --git a/node/service.go b/node/service.go index 28fc34764..bfeeb7ab9 100644 --- a/node/service.go +++ b/node/service.go @@ -29,39 +29,29 @@ import ( // the protocol stack, that is passed to all constructors to be optionally used; // as well as utility methods to operate on the service environment. type ServiceContext struct { - datadir string // Data directory for protocol persistence - services map[string]Service // Index of the already constructed services - EventMux *event.TypeMux // Event multiplexer used for decoupled notifications + datadir string // Data directory for protocol persistence + services map[reflect.Type]Service // Index of the already constructed services + EventMux *event.TypeMux // Event multiplexer used for decoupled notifications } -// Database opens an existing database with the given name (or creates one if no -// previous can be found) from within the node's data directory. If the node is -// an ephemeral one, a memory database is returned. -func (ctx *ServiceContext) Database(name string, cache int) (ethdb.Database, error) { +// OpenDatabase opens an existing database with the given name (or creates one +// if no previous can be found) from within the node's data directory. If the +// node is an ephemeral one, a memory database is returned. +func (ctx *ServiceContext) OpenDatabase(name string, cache int) (ethdb.Database, error) { if ctx.datadir == "" { return ethdb.NewMemDatabase() } return ethdb.NewLDBDatabase(filepath.Join(ctx.datadir, name), cache) } -// Service retrieves an already constructed service registered under a given id. -func (ctx *ServiceContext) Service(id string) Service { - return ctx.services[id] -} - -// SingletonService retrieves an already constructed service using a specific type -// implementing the Service interface. This is a utility function for scenarios -// where it is known that only one instance of a given service type is running, -// allowing to access services without needing to know their specific id with -// which they were registered. -func (ctx *ServiceContext) SingletonService(service interface{}) (string, error) { - for id, running := range ctx.services { - if reflect.TypeOf(running) == reflect.ValueOf(service).Elem().Type() { - reflect.ValueOf(service).Elem().Set(reflect.ValueOf(running)) - return id, nil - } +// Service retrieves a currently running service registered of a specific type. +func (ctx *ServiceContext) Service(service interface{}) error { + element := reflect.ValueOf(service).Elem() + if running, ok := ctx.services[element.Type()]; ok { + element.Set(reflect.ValueOf(running)) + return nil } - return "", ErrServiceUnknown + return ErrServiceUnknown } // ServiceConstructor is the function signature of the constructors needed to be diff --git a/node/service_test.go b/node/service_test.go index 921a1a012..1dfb61f73 100644 --- a/node/service_test.go +++ b/node/service_test.go @@ -39,7 +39,7 @@ func TestContextDatabases(t *testing.T) { } // Request the opening/creation of a database and ensure it persists to disk ctx := &ServiceContext{datadir: dir} - db, err := ctx.Database("persistent", 0) + db, err := ctx.OpenDatabase("persistent", 0) if err != nil { t.Fatalf("failed to open persistent database: %v", err) } @@ -50,7 +50,7 @@ func TestContextDatabases(t *testing.T) { } // Request th opening/creation of an ephemeral database and ensure it's not persisted ctx = &ServiceContext{datadir: ""} - db, err = ctx.Database("ephemeral", 0) + db, err = ctx.OpenDatabase("ephemeral", 0) if err != nil { t.Fatalf("failed to open ephemeral database: %v", err) } @@ -67,36 +67,27 @@ func TestContextServices(t *testing.T) { if err != nil { t.Fatalf("failed to create protocol stack: %v", err) } - // Define a set of services, constructed before/after a verifier - formers := []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"} - latters := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"} - + // Define a verifier that ensures a NoopA is before it and NoopB after verifier := func(ctx *ServiceContext) (Service, error) { - for i, id := range formers { - if ctx.Service(id) == nil { - return nil, fmt.Errorf("former %d: service not found", i) - } + var objA *NoopServiceA + if ctx.Service(&objA) != nil { + return nil, fmt.Errorf("former service not found") } - for i, id := range latters { - if ctx.Service(id) != nil { - return nil, fmt.Errorf("latters %d: service found", i) - } + var objB *NoopServiceB + if err := ctx.Service(&objB); err != ErrServiceUnknown { + return nil, fmt.Errorf("latters lookup error mismatch: have %v, want %v", err, ErrServiceUnknown) } return new(NoopService), nil } // Register the collection of services - for i, id := range formers { - if err := stack.Register(id, NewNoopService); err != nil { - t.Fatalf("former #%d: failed to register service: %v", i, err) - } + if err := stack.Register(NewNoopServiceA); err != nil { + t.Fatalf("former failed to register service: %v", err) } - if err := stack.Register("verifier", verifier); err != nil { + if err := stack.Register(verifier); err != nil { t.Fatalf("failed to register service verifier: %v", err) } - for i, id := range latters { - if err := stack.Register(id, NewNoopService); err != nil { - t.Fatalf("latter #%d: failed to register service: %v", i, err) - } + if err := stack.Register(NewNoopServiceB); err != nil { + t.Fatalf("latter failed to register service: %v", err) } // Start the protocol stack and ensure services are constructed in order if err := stack.Start(); err != nil { diff --git a/node/utils_test.go b/node/utils_test.go new file mode 100644 index 000000000..756622c86 --- /dev/null +++ b/node/utils_test.go @@ -0,0 +1,117 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Contains a batch of utility type declarations used by the tests. As the node +// operates on unique types, a lot of them are needed to check various features. + +package node + +import ( + "reflect" + + "github.com/ethereum/go-ethereum/p2p" +) + +// NoopService is a trivial implementation of the Service interface. +type NoopService struct{} + +func (s *NoopService) Protocols() []p2p.Protocol { return nil } +func (s *NoopService) Start(*p2p.Server) error { return nil } +func (s *NoopService) Stop() error { return nil } + +func NewNoopService(*ServiceContext) (Service, error) { return new(NoopService), nil } + +// Set of services all wrapping the base NoopService resulting in the same method +// signatures but different outer types. +type NoopServiceA struct{ NoopService } +type NoopServiceB struct{ NoopService } +type NoopServiceC struct{ NoopService } +type NoopServiceD struct{ NoopService } + +func NewNoopServiceA(*ServiceContext) (Service, error) { return new(NoopServiceA), nil } +func NewNoopServiceB(*ServiceContext) (Service, error) { return new(NoopServiceB), nil } +func NewNoopServiceC(*ServiceContext) (Service, error) { return new(NoopServiceC), nil } +func NewNoopServiceD(*ServiceContext) (Service, error) { return new(NoopServiceD), nil } + +// InstrumentedService is an implementation of Service for which all interface +// methods can be instrumented both return value as well as event hook wise. +type InstrumentedService struct { + protocols []p2p.Protocol + start error + stop error + + protocolsHook func() + startHook func(*p2p.Server) + stopHook func() +} + +func NewInstrumentedService(*ServiceContext) (Service, error) { return new(InstrumentedService), nil } + +func (s *InstrumentedService) Protocols() []p2p.Protocol { + if s.protocolsHook != nil { + s.protocolsHook() + } + return s.protocols +} + +func (s *InstrumentedService) Start(server *p2p.Server) error { + if s.startHook != nil { + s.startHook(server) + } + return s.start +} + +func (s *InstrumentedService) Stop() error { + if s.stopHook != nil { + s.stopHook() + } + return s.stop +} + +// InstrumentingWrapper is a method to specialize a service constructor returning +// a generic InstrumentedService into one returning a wrapping specific one. +type InstrumentingWrapper func(base ServiceConstructor) ServiceConstructor + +func InstrumentingWrapperMaker(base ServiceConstructor, kind reflect.Type) ServiceConstructor { + return func(ctx *ServiceContext) (Service, error) { + obj, err := base(ctx) + if err != nil { + return nil, err + } + wrapper := reflect.New(kind) + wrapper.Elem().Field(0).Set(reflect.ValueOf(obj).Elem()) + + return wrapper.Interface().(Service), nil + } +} + +// Set of services all wrapping the base InstrumentedService resulting in the +// same method signatures but different outer types. +type InstrumentedServiceA struct{ InstrumentedService } +type InstrumentedServiceB struct{ InstrumentedService } +type InstrumentedServiceC struct{ InstrumentedService } + +func InstrumentedServiceMakerA(base ServiceConstructor) ServiceConstructor { + return InstrumentingWrapperMaker(base, reflect.TypeOf(InstrumentedServiceA{})) +} + +func InstrumentedServiceMakerB(base ServiceConstructor) ServiceConstructor { + return InstrumentingWrapperMaker(base, reflect.TypeOf(InstrumentedServiceB{})) +} + +func InstrumentedServiceMakerC(base ServiceConstructor) ServiceConstructor { + return InstrumentingWrapperMaker(base, reflect.TypeOf(InstrumentedServiceC{})) +} diff --git a/rpc/api/admin.go b/rpc/api/admin.go index 4682062e0..1133c9bca 100644 --- a/rpc/api/admin.go +++ b/rpc/api/admin.go @@ -97,7 +97,7 @@ func NewAdminApi(xeth *xeth.XEth, stack *node.Node, codec codec.Codec) *adminApi coder: codec.New(nil), } if stack != nil { - stack.SingletonService(&api.ethereum) + stack.Service(&api.ethereum) } return api } diff --git a/rpc/api/utils.go b/rpc/api/utils.go index 6e372c061..d6820cd2e 100644 --- a/rpc/api/utils.go +++ b/rpc/api/utils.go @@ -165,7 +165,7 @@ func ParseApiString(apistr string, codec codec.Codec, xeth *xeth.XEth, stack *no var eth *eth.Ethereum if stack != nil { - if _, err := stack.SingletonService(ð); err != nil { + if err := stack.Service(ð); err != nil { return nil, err } } diff --git a/xeth/state.go b/xeth/state.go index e67dc4b5f..7daccb525 100644 --- a/xeth/state.go +++ b/xeth/state.go @@ -19,7 +19,6 @@ package xeth import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" - "github.com/ethereum/go-ethereum/eth" ) type State struct { @@ -46,7 +45,7 @@ func (self *State) SafeGet(addr string) *Object { func (self *State) safeGet(addr string) *state.StateObject { object := self.state.GetStateObject(common.HexToAddress(addr)) if object == nil { - object = state.NewStateObject(common.HexToAddress(addr), self.xeth.backend.Service("eth").(*eth.Ethereum).ChainDb()) + object = state.NewStateObject(common.HexToAddress(addr), self.xeth.EthereumService().ChainDb()) } return object } diff --git a/xeth/xeth.go b/xeth/xeth.go index 6bc94273e..5e773d1c9 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -100,8 +100,8 @@ func New(stack *node.Node, frontend Frontend) *XEth { ethereum *eth.Ethereum whisper *whisper.Whisper ) - stack.SingletonService(ðereum) - stack.SingletonService(&whisper) + stack.Service(ðereum) + stack.Service(&whisper) xeth := &XEth{ backend: stack, @@ -130,7 +130,7 @@ func New(stack *node.Node, frontend Frontend) *XEth { func (self *XEth) EthereumService() *eth.Ethereum { var ethereum *eth.Ethereum - if _, err := self.backend.SingletonService(ðereum); err != nil { + if err := self.backend.Service(ðereum); err != nil { return nil } return ethereum @@ -138,7 +138,7 @@ func (self *XEth) EthereumService() *eth.Ethereum { func (self *XEth) WhisperService() *whisper.Whisper { var whisper *whisper.Whisper - if _, err := self.backend.SingletonService(&whisper); err != nil { + if err := self.backend.Service(&whisper); err != nil { return nil } return whisper -- cgit v1.2.3