diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-11-27 17:41:22 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-11-27 17:41:22 +0800 |
commit | 7dde2b902cf81e90b484b1a48f6d45e0abd10e0f (patch) | |
tree | 9b92cb3e42269697e0a2b553ba31c36aef73cc25 /node/node.go | |
parent | ffe58bf5abe5100b29ac1091c882f586cd3a2ef9 (diff) | |
parent | 3e1000fda3424d880bc43ebbb16d8a33447d4182 (diff) | |
download | dexon-7dde2b902cf81e90b484b1a48f6d45e0abd10e0f.tar dexon-7dde2b902cf81e90b484b1a48f6d45e0abd10e0f.tar.gz dexon-7dde2b902cf81e90b484b1a48f6d45e0abd10e0f.tar.bz2 dexon-7dde2b902cf81e90b484b1a48f6d45e0abd10e0f.tar.lz dexon-7dde2b902cf81e90b484b1a48f6d45e0abd10e0f.tar.xz dexon-7dde2b902cf81e90b484b1a48f6d45e0abd10e0f.tar.zst dexon-7dde2b902cf81e90b484b1a48f6d45e0abd10e0f.zip |
Merge pull request #1970 from karalabe/customizable-protocol-stacks
Customizable protocol stacks
Diffstat (limited to 'node/node.go')
-rw-r--r-- | node/node.go | 266 |
1 files changed, 266 insertions, 0 deletions
diff --git a/node/node.go b/node/node.go new file mode 100644 index 000000000..5566bc44b --- /dev/null +++ b/node/node.go @@ -0,0 +1,266 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package node represents the Ethereum protocol stack container. +package node + +import ( + "errors" + "os" + "path/filepath" + "reflect" + "sync" + "syscall" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p" +) + +var ( + ErrDatadirUsed = errors.New("datadir already used") + ErrNodeStopped = errors.New("node not started") + ErrNodeRunning = errors.New("node already running") + ErrServiceUnknown = errors.New("unknown service") + + datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true} +) + +// Node represents a P2P node into which arbitrary (uniquely typed) services might +// be registered. +type Node struct { + datadir string // Path to the currently used data directory + eventmux *event.TypeMux // Event multiplexer used between the services of a stack + + serverConfig *p2p.Server // Configuration of the underlying P2P networking layer + server *p2p.Server // Currently running P2P networking layer + + serviceFuncs []ServiceConstructor // Service constructors (in dependency order) + services map[reflect.Type]Service // Currently running services + + stop chan struct{} // Channel to wait for termination notifications + lock sync.RWMutex +} + +// New creates a new P2P node, ready for protocol registration. +func New(conf *Config) (*Node, error) { + // Ensure the data directory exists, failing if it cannot be created + if conf.DataDir != "" { + if err := os.MkdirAll(conf.DataDir, 0700); err != nil { + return nil, err + } + } + // Assemble the networking layer and the node itself + nodeDbPath := "" + if conf.DataDir != "" { + nodeDbPath = filepath.Join(conf.DataDir, datadirNodeDatabase) + } + return &Node{ + datadir: conf.DataDir, + serverConfig: &p2p.Server{ + PrivateKey: conf.NodeKey(), + Name: conf.Name, + Discovery: !conf.NoDiscovery, + BootstrapNodes: conf.BootstrapNodes, + StaticNodes: conf.StaticNodes(), + TrustedNodes: conf.TrusterNodes(), + NodeDatabase: nodeDbPath, + ListenAddr: conf.ListenAddr, + NAT: conf.NAT, + Dialer: conf.Dialer, + NoDial: conf.NoDial, + MaxPeers: conf.MaxPeers, + MaxPendingPeers: conf.MaxPendingPeers, + }, + serviceFuncs: []ServiceConstructor{}, + eventmux: new(event.TypeMux), + }, nil +} + +// Register injects a new service into the node's stack. The service created by +// the passed constructor must be unique in its type with regard to sibling ones. +func (n *Node) Register(constructor ServiceConstructor) error { + n.lock.Lock() + defer n.lock.Unlock() + + if n.server != nil { + return ErrNodeRunning + } + n.serviceFuncs = append(n.serviceFuncs, constructor) + return nil +} + +// Start create a live P2P node and starts running it. +func (n *Node) Start() error { + n.lock.Lock() + defer n.lock.Unlock() + + // Short circuit if the node's already running + if n.server != nil { + return ErrNodeRunning + } + // Otherwise copy and specialize the P2P configuration + running := new(p2p.Server) + *running = *n.serverConfig + + services := make(map[reflect.Type]Service) + for _, constructor := range n.serviceFuncs { + // Create a new context for the particular service + ctx := &ServiceContext{ + datadir: n.datadir, + services: make(map[reflect.Type]Service), + EventMux: n.eventmux, + } + for kind, s := range services { // copy needed for threaded access + ctx.services[kind] = s + } + // Construct and save the service + service, err := constructor(ctx) + if err != nil { + return err + } + kind := reflect.TypeOf(service) + if _, exists := services[kind]; exists { + return &DuplicateServiceError{Kind: kind} + } + services[kind] = service + } + // Gather the protocols and start the freshly assembled P2P server + for _, service := range services { + running.Protocols = append(running.Protocols, service.Protocols()...) + } + if err := running.Start(); err != nil { + if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] { + return ErrDatadirUsed + } + return err + } + // Start each of the services + started := []reflect.Type{} + for kind, service := range services { + // Start the next service, stopping all previous upon failure + if err := service.Start(running); err != nil { + for _, kind := range started { + services[kind].Stop() + } + running.Stop() + + return err + } + // Mark the service started for potential cleanup + started = append(started, kind) + } + // Finish initializing the startup + n.services = services + n.server = running + n.stop = make(chan struct{}) + + return nil +} + +// Stop terminates a running node along with all it's services. In the node was +// not started, an error is returned. +func (n *Node) Stop() error { + n.lock.Lock() + defer n.lock.Unlock() + + // Short circuit if the node's not running + if n.server == nil { + return ErrNodeStopped + } + // Otherwise terminate all the services and the P2P server too + failure := &StopError{ + Services: make(map[reflect.Type]error), + } + for kind, service := range n.services { + if err := service.Stop(); err != nil { + failure.Services[kind] = err + } + } + n.server.Stop() + + n.services = nil + n.server = nil + close(n.stop) + + if len(failure.Services) > 0 { + return failure + } + return nil +} + +// Wait blocks the thread until the node is stopped. If the node is not running +// at the time of invocation, the method immediately returns. +func (n *Node) Wait() { + n.lock.RLock() + if n.server == nil { + return + } + stop := n.stop + n.lock.RUnlock() + + <-stop +} + +// Restart terminates a running node and boots up a new one in its place. If the +// node isn't running, an error is returned. +func (n *Node) Restart() error { + if err := n.Stop(); err != nil { + return err + } + if err := n.Start(); err != nil { + return err + } + return nil +} + +// Server retrieves the currently running P2P network layer. This method is meant +// only to inspect fields of the currently running server, life cycle management +// should be left to this Node entity. +func (n *Node) Server() *p2p.Server { + n.lock.RLock() + defer n.lock.RUnlock() + + return n.server +} + +// Service retrieves a currently running service registered of a specific type. +func (n *Node) Service(service interface{}) error { + n.lock.RLock() + defer n.lock.RUnlock() + + // Short circuit if the node's not running + if n.server == nil { + return ErrNodeStopped + } + // Otherwise try to find the service to return + element := reflect.ValueOf(service).Elem() + if running, ok := n.services[element.Type()]; ok { + element.Set(reflect.ValueOf(running)) + return nil + } + return ErrServiceUnknown +} + +// DataDir retrieves the current datadir used by the protocol stack. +func (n *Node) DataDir() string { + return n.datadir +} + +// EventMux retrieves the event multiplexer used by all the network services in +// the current protocol stack. +func (n *Node) EventMux() *event.TypeMux { + return n.eventmux +} |