diff options
Diffstat (limited to 'node/node.go')
-rw-r--r-- | node/node.go | 126 |
1 files changed, 39 insertions, 87 deletions
diff --git a/node/node.go b/node/node.go index 023f77403..5566bc44b 100644 --- a/node/node.go +++ b/node/node.go @@ -30,16 +30,16 @@ import ( ) var ( - ErrDatadirUsed = errors.New("datadir already used") - ErrNodeStopped = errors.New("node not started") - ErrNodeRunning = errors.New("node already running") - ErrServiceUnknown = errors.New("service not registered") - ErrServiceRegistered = errors.New("service already registered") + ErrDatadirUsed = errors.New("datadir already used") + ErrNodeStopped = errors.New("node not started") + ErrNodeRunning = errors.New("node already running") + ErrServiceUnknown = errors.New("unknown service") datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true} ) -// Node represents a P2P node into which arbitrary services might be registered. +// Node represents a P2P node into which arbitrary (uniquely typed) services might +// be registered. type Node struct { datadir string // Path to the currently used data directory eventmux *event.TypeMux // Event multiplexer used between the services of a stack @@ -47,9 +47,8 @@ type Node struct { serverConfig *p2p.Server // Configuration of the underlying P2P networking layer server *p2p.Server // Currently running P2P networking layer - serviceIndex map[string]ServiceConstructor // Set of services currently registered in the node - serviceOrder []string // Service construction order to handle dependencies - services map[string]Service // Currently running services + serviceFuncs []ServiceConstructor // Service constructors (in dependency order) + services map[reflect.Type]Service // Currently running services stop chan struct{} // Channel to wait for termination notifications lock sync.RWMutex @@ -85,52 +84,21 @@ func New(conf *Config) (*Node, error) { MaxPeers: conf.MaxPeers, MaxPendingPeers: conf.MaxPendingPeers, }, - serviceIndex: make(map[string]ServiceConstructor), - serviceOrder: []string{}, + serviceFuncs: []ServiceConstructor{}, eventmux: new(event.TypeMux), }, nil } -// Register injects a new service into the node's stack. -func (n *Node) Register(id string, constructor ServiceConstructor) error { +// Register injects a new service into the node's stack. The service created by +// the passed constructor must be unique in its type with regard to sibling ones. +func (n *Node) Register(constructor ServiceConstructor) error { n.lock.Lock() defer n.lock.Unlock() - // Short circuit if the node is running or if the id is taken if n.server != nil { return ErrNodeRunning } - if _, ok := n.serviceIndex[id]; ok { - return ErrServiceRegistered - } - // Otherwise register the service and return - n.serviceOrder = append(n.serviceOrder, id) - n.serviceIndex[id] = constructor - - return nil -} - -// Unregister removes a service from a node's stack. If the node is currently -// running, an error will be returned. -func (n *Node) Unregister(id string) error { - n.lock.Lock() - defer n.lock.Unlock() - - // Short circuit if the node is running, or if the service is unknown - if n.server != nil { - return ErrNodeRunning - } - if _, ok := n.serviceIndex[id]; !ok { - return ErrServiceUnknown - } - // Otherwise drop the service and return - delete(n.serviceIndex, id) - for i, service := range n.serviceOrder { - if service == id { - n.serviceOrder = append(n.serviceOrder[:i], n.serviceOrder[i+1:]...) - break - } - } + n.serviceFuncs = append(n.serviceFuncs, constructor) return nil } @@ -147,25 +115,27 @@ func (n *Node) Start() error { running := new(p2p.Server) *running = *n.serverConfig - services := make(map[string]Service) - for _, id := range n.serviceOrder { - constructor := n.serviceIndex[id] - + services := make(map[reflect.Type]Service) + for _, constructor := range n.serviceFuncs { // Create a new context for the particular service ctx := &ServiceContext{ datadir: n.datadir, - services: make(map[string]Service), + services: make(map[reflect.Type]Service), EventMux: n.eventmux, } - for id, s := range services { // copy needed for threaded access - ctx.services[id] = s + for kind, s := range services { // copy needed for threaded access + ctx.services[kind] = s } // Construct and save the service service, err := constructor(ctx) if err != nil { return err } - services[id] = service + kind := reflect.TypeOf(service) + if _, exists := services[kind]; exists { + return &DuplicateServiceError{Kind: kind} + } + services[kind] = service } // Gather the protocols and start the freshly assembled P2P server for _, service := range services { @@ -178,19 +148,19 @@ func (n *Node) Start() error { return err } // Start each of the services - started := []string{} - for id, service := range services { + started := []reflect.Type{} + for kind, service := range services { // Start the next service, stopping all previous upon failure if err := service.Start(running); err != nil { - for _, id := range started { - services[id].Stop() + for _, kind := range started { + services[kind].Stop() } running.Stop() return err } // Mark the service started for potential cleanup - started = append(started, id) + started = append(started, kind) } // Finish initializing the startup n.services = services @@ -212,11 +182,11 @@ func (n *Node) Stop() error { } // Otherwise terminate all the services and the P2P server too failure := &StopError{ - Services: make(map[string]error), + Services: make(map[reflect.Type]error), } - for id, service := range n.services { + for kind, service := range n.services { if err := service.Stop(); err != nil { - failure.Services[id] = err + failure.Services[kind] = err } } n.server.Stop() @@ -266,40 +236,22 @@ func (n *Node) Server() *p2p.Server { return n.server } -// Service retrieves a currently running service registered under a given id. -func (n *Node) Service(id string) Service { - n.lock.RLock() - defer n.lock.RUnlock() - - // Short circuit if the node's not running - if n.server == nil { - return nil - } - return n.services[id] -} - -// SingletonService retrieves a currently running service using a specific type -// implementing the Service interface. This is a utility function for scenarios -// where it is known that only one instance of a given service type is running, -// allowing to access services without needing to know their specific id with -// which they were registered. Note, this method uses reflection, so do not run -// in a tight loop. -func (n *Node) SingletonService(service interface{}) (string, error) { +// Service retrieves a currently running service registered of a specific type. +func (n *Node) Service(service interface{}) error { n.lock.RLock() defer n.lock.RUnlock() // Short circuit if the node's not running if n.server == nil { - return "", ErrServiceUnknown + return ErrNodeStopped } // Otherwise try to find the service to return - for id, running := range n.services { - if reflect.TypeOf(running) == reflect.ValueOf(service).Elem().Type() { - reflect.ValueOf(service).Elem().Set(reflect.ValueOf(running)) - return id, nil - } + element := reflect.ValueOf(service).Elem() + if running, ok := n.services[element.Type()]; ok { + element.Set(reflect.ValueOf(running)) + return nil } - return "", ErrServiceUnknown + return ErrServiceUnknown } // DataDir retrieves the current datadir used by the protocol stack. |