diff options
author | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-06-27 10:39:46 -0400 |
---|---|---|
committer | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-06-27 10:39:46 -0400 |
commit | 67fda2597efe7cec04b037138cef86f1e328cc4c (patch) | |
tree | f68c521b757ec1f00c8fe158b88286a2234226ed /src/lib/Bcfg2 | |
parent | 94d90ae60a82bc3ec104ed558627f896a1082e33 (diff) | |
download | bcfg2-67fda2597efe7cec04b037138cef86f1e328cc4c.tar.gz bcfg2-67fda2597efe7cec04b037138cef86f1e328cc4c.tar.bz2 bcfg2-67fda2597efe7cec04b037138cef86f1e328cc4c.zip |
Options: migrated server core to new option parser
Diffstat (limited to 'src/lib/Bcfg2')
-rw-r--r-- | src/lib/Bcfg2/Server/BuiltinCore.py | 35 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/CherrypyCore.py (renamed from src/lib/Bcfg2/Server/CherryPyCore.py) | 34 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/Core.py | 390 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/MultiprocessingCore.py | 36 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/SSLServer.py | 30 |
5 files changed, 267 insertions, 258 deletions
diff --git a/src/lib/Bcfg2/Server/BuiltinCore.py b/src/lib/Bcfg2/Server/BuiltinCore.py index b05ad9d41..85f7fa228 100644 --- a/src/lib/Bcfg2/Server/BuiltinCore.py +++ b/src/lib/Bcfg2/Server/BuiltinCore.py @@ -4,8 +4,9 @@ import sys import time import socket import daemon +import Bcfg2.Options import Bcfg2.Server.Statistics -from Bcfg2.Server.Core import BaseCore, NoExposedMethod +from Bcfg2.Server.Core import NetworkCore, NoExposedMethod from Bcfg2.Compat import xmlrpclib, urlparse from Bcfg2.Server.SSLServer import XMLRPCServer @@ -18,28 +19,28 @@ except ImportError: # pylint: enable=E0611 -class Core(BaseCore): +class BuiltinCore(NetworkCore): """ The built-in server core """ name = 'bcfg2-server' def __init__(self): - BaseCore.__init__(self) + NetworkCore.__init__(self) #: The :class:`Bcfg2.Server.SSLServer.XMLRPCServer` instance #: powering this server core self.server = None - daemon_args = dict(uid=self.setup['daemon_uid'], - gid=self.setup['daemon_gid'], - umask=int(self.setup['umask'], 8)) - if self.setup['daemon']: - daemon_args['pidfile'] = TimeoutPIDLockFile(self.setup['daemon'], - acquire_timeout=5) + daemon_args = dict(uid=Bcfg2.Options.setup.daemon_uid, + gid=Bcfg2.Options.setup.daemon_gid, + umask=int(Bcfg2.Options.setup.umask, 8)) + if Bcfg2.Options.setup.daemon: + daemon_args['pidfile'] = TimeoutPIDLockFile( + Bcfg2.Options.setup.daemon, acquire_timeout=5) #: The :class:`daemon.DaemonContext` used to drop #: privileges, write the PID file (with :class:`PidFile`), #: and daemonize this core. self.context = daemon.DaemonContext(**daemon_args) - __init__.__doc__ = BaseCore.__init__.__doc__.split('.. -----')[0] + __init__.__doc__ = NetworkCore.__init__.__doc__.split('.. -----')[0] def _dispatch(self, method, args, dispatch_dict): """ Dispatch XML-RPC method calls @@ -94,25 +95,25 @@ class Core(BaseCore): except LockTimeout: err = sys.exc_info()[1] self.logger.error("Failed to daemonize %s: Failed to acquire lock " - "on %s" % (self.name, self.setup['daemon'])) + "on %s" % (self.name, + Bcfg2.Options.setup.daemon)) return False def _run(self): """ Create :attr:`server` to start the server listening. """ - hostname, port = urlparse(self.setup['location'])[1].split(':') + hostname, port = urlparse(Bcfg2.Options.setup.server)[1].split(':') server_address = socket.getaddrinfo(hostname, port, socket.AF_UNSPEC, socket.SOCK_STREAM)[0][4] try: - self.server = XMLRPCServer(self.setup['listen_all'], + self.server = XMLRPCServer(Bcfg2.Options.setup.listen_all, server_address, - keyfile=self.setup['key'], - certfile=self.setup['cert'], + keyfile=Bcfg2.Options.setup.key, + certfile=Bcfg2.Options.setup.cert, register=False, timeout=1, - ca=self.setup['ca'], - protocol=self.setup['protocol']) + ca=Bcfg2.Options.setup.ca) except: # pylint: disable=W0702 err = sys.exc_info()[1] self.logger.error("Server startup failed: %s" % err) diff --git a/src/lib/Bcfg2/Server/CherryPyCore.py b/src/lib/Bcfg2/Server/CherrypyCore.py index bf3be72f9..dbfe260f7 100644 --- a/src/lib/Bcfg2/Server/CherryPyCore.py +++ b/src/lib/Bcfg2/Server/CherrypyCore.py @@ -5,7 +5,7 @@ import sys import time import Bcfg2.Server.Statistics from Bcfg2.Compat import urlparse, xmlrpclib, b64decode -from Bcfg2.Server.Core import BaseCore +from Bcfg2.Server.Core import NetworkCore import cherrypy from cherrypy.lib import xmlrpcutil from cherrypy._cptools import ErrorTool @@ -27,7 +27,7 @@ def on_error(*args, **kwargs): # pylint: disable=W0613 cherrypy.tools.xmlrpc_error = ErrorTool(on_error) -class Core(BaseCore): +class CherrypyCore(NetworkCore): """ The CherryPy-based server core. """ #: Base CherryPy config for this class. We enable the @@ -37,7 +37,7 @@ class Core(BaseCore): 'tools.bcfg2_authn.on': True} def __init__(self): - BaseCore.__init__(self) + NetworkCore.__init__(self) cherrypy.tools.bcfg2_authn = cherrypy.Tool('on_start_resource', self.do_authn) @@ -45,11 +45,11 @@ class Core(BaseCore): #: List of exposed plugin RMI self.rmi = self._get_rmi() cherrypy.engine.subscribe('stop', self.shutdown) - __init__.__doc__ = BaseCore.__init__.__doc__.split('.. -----')[0] + __init__.__doc__ = NetworkCore.__init__.__doc__.split('.. -----')[0] def do_authn(self): """ Perform authentication by calling - :func:`Bcfg2.Server.Core.BaseCore.authenticate`. This is + :func:`Bcfg2.Server.Core.NetworkCore.authenticate`. This is implemented as a CherryPy tool.""" try: header = cherrypy.request.headers['Authorization'] @@ -115,36 +115,36 @@ class Core(BaseCore): with :class:`cherrypy.process.plugins.Daemonizer`, and write a PID file with :class:`cherrypy.process.plugins.PIDFile`. """ DropPrivileges(cherrypy.engine, - uid=self.setup['daemon_uid'], - gid=self.setup['daemon_gid'], - umask=int(self.setup['umask'], 8)).subscribe() + uid=Bcfg2.Options.setup.daemon_uid, + gid=Bcfg2.Options.setup.daemon_gid, + umask=int(Bcfg2.Options.setup.umask, 8)).subscribe() Daemonizer(cherrypy.engine).subscribe() - PIDFile(cherrypy.engine, self.setup['daemon']).subscribe() + PIDFile(cherrypy.engine, Bcfg2.Options.setup.daemon).subscribe() return True def _run(self): """ Start the server listening. """ - hostname, port = urlparse(self.setup['location'])[1].split(':') - if self.setup['listen_all']: + hostname, port = urlparse(Bcfg2.Options.setup.server)[1].split(':') + if Bcfg2.Options.setup.listen_all: hostname = '0.0.0.0' config = {'engine.autoreload.on': False, 'server.socket_port': int(port), 'server.socket_host': hostname} - if self.setup['cert'] and self.setup['key']: + if Bcfg2.Options.setup.cert and Bcfg2.Options.setup.key: config.update({'server.ssl_module': 'pyopenssl', - 'server.ssl_certificate': self.setup['cert'], - 'server.ssl_private_key': self.setup['key']}) - if self.setup['debug']: + 'server.ssl_certificate': Bcfg2.Options.setup.cert, + 'server.ssl_private_key': Bcfg2.Options.setup.key}) + if Bcfg2.Options.setup.debug: config['log.screen'] = True cherrypy.config.update(config) - cherrypy.tree.mount(self, '/', {'/': self.setup}) + cherrypy.tree.mount(self, '/', {'/': Bcfg2.Options.setup}) cherrypy.engine.start() return True def _block(self): """ Enter the blocking infinite server - loop. :func:`Bcfg2.Server.Core.BaseCore.shutdown` is called on + loop. :func:`Bcfg2.Server.Core.NetworkCore.shutdown` is called on exit by a :meth:`subscription <cherrypy.process.wspbus.Bus.subscribe>` on the top-level CherryPy engine.""" diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index 7aa07f2a2..58044447b 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -13,12 +13,12 @@ import inspect import lxml.etree import Bcfg2.Server import Bcfg2.Logger +import Bcfg2.Options import Bcfg2.settings import Bcfg2.Server.Statistics import Bcfg2.Server.FileMonitor from itertools import chain from Bcfg2.Server.Cache import Cache -from Bcfg2.Options import get_option_parser, SERVER_FAM_IGNORE from Bcfg2.Compat import xmlrpclib # pylint: disable=W0622 from Bcfg2.Server.Plugin.exceptions import * # pylint: disable=W0401,W0614 from Bcfg2.Server.Plugin.interfaces import * # pylint: disable=W0401,W0614 @@ -82,43 +82,40 @@ class NoExposedMethod (Exception): # in core we frequently want to catch all exceptions, regardless of # type, so disable the pylint rule that catches that. - -class BaseCore(object): +class Core(object): """ The server core is the container for all Bcfg2 server logic and modules. All core implementations must inherit from - ``BaseCore``. """ + ``Core``. """ + + options = [ + Bcfg2.Options.Common.plugins, + Bcfg2.Options.Common.repository, + Bcfg2.Options.Common.filemonitor, + Bcfg2.Options.BooleanOption( + cf=('server', 'fam_blocking'), default=False, + help='FAM blocks on startup until all events are processed'), + Bcfg2.Options.BooleanOption( + cf=('logging', 'performance'), dest="perflog", + help="Periodically log performance statistics"), + Bcfg2.Options.Option( + cf=('logging', 'performance_interval'), default=300.0, + type=Bcfg2.Options.Types.timeout, + help="Performance statistics logging interval in seconds"), + Bcfg2.Options.Option( + cf=('caching', 'client_metadata'), dest='client_metadata_cache', + default='off', + choices=['off', 'on', 'initial', 'cautious', 'aggressive'])] def __init__(self): # pylint: disable=R0912,R0915 """ - .. automethod:: _daemonize .. automethod:: _run .. automethod:: _block .. ----- .. automethod:: _file_monitor_thread .. automethod:: _perflog_thread """ - #: The Bcfg2 options dict - self.setup = get_option_parser() - #: The Bcfg2 repository directory - self.datastore = self.setup['repo'] - - if self.setup['debug']: - level = logging.DEBUG - elif self.setup['verbose']: - level = logging.INFO - else: - level = logging.WARNING - # we set a higher log level for the console by default. we - # assume that if someone is running bcfg2-server in such a way - # that it _can_ log to console, they want more output. if - # level is set to DEBUG, that will get handled by - # setup_logging and the console will get DEBUG output. - Bcfg2.Logger.setup_logging('bcfg2-server', - to_console=logging.INFO, - to_syslog=self.setup['syslog'], - to_file=self.setup['logging'], - level=level) + self.datastore = Bcfg2.Options.setup.repository #: A :class:`logging.Logger` object for use by the core self.logger = logging.getLogger('bcfg2-server') @@ -130,43 +127,32 @@ class BaseCore(object): #: special, and will be used for any log handlers whose name #: does not appear elsewhere in the dict. At a minimum, #: ``default`` must be provided. - self._loglevels = {True: dict(default=logging.DEBUG), - False: dict(console=logging.INFO, - default=level)} + self._loglevels = { + True: dict(default=logging.DEBUG), + False: dict(console=logging.INFO, + default=Bcfg2.Logger.default_log_level())} #: Used to keep track of the current debug state of the core. self.debug_flag = False # enable debugging on the core now. debugging is enabled on # everything else later - if self.setup['debug']: - self.set_core_debug(None, self.setup['debug']) - - if 'ignore' not in self.setup: - self.setup.add_option('ignore', SERVER_FAM_IGNORE) - self.setup.reparse() - - famargs = dict(filemonitor=self.setup['filemonitor'], - debug=self.setup['debug'], - ignore=self.setup['ignore']) - if self.setup['filemonitor'] not in Bcfg2.Server.FileMonitor.available: - self.logger.error("File monitor driver %s not available; " - "forcing to default" % self.setup['filemonitor']) - famargs['filemonitor'] = 'default' + if Bcfg2.Options.setup.debug: + self.set_core_debug(None, Bcfg2.Options.setup.debug) try: #: The :class:`Bcfg2.Server.FileMonitor.FileMonitor` #: object used by the core to monitor for Bcfg2 data #: changes. - self.fam = Bcfg2.Server.FileMonitor.load_fam(**famargs) + self.fam = Bcfg2.Server.FileMonitor.get_fam() except IOError: msg = "Failed to instantiate fam driver %s" % \ - self.setup['filemonitor'] + Bcfg2.Options.setup.filemonitor self.logger.error(msg, exc_info=1) raise CoreInitError(msg) #: Path to bcfg2.conf - self.cfile = self.setup['configfile'] + self.cfile = Bcfg2.Options.setup.config #: Dict of plugins that are enabled. Keys are the plugin #: names (just the plugin name, in the correct case; e.g., @@ -198,59 +184,19 @@ class BaseCore(object): # generate Django ORM settings. this must be done _before_ we # load plugins - Bcfg2.settings.read_config(repo=self.datastore) - - #: Whether or not it's possible to use the Django database - #: backend for plugins that have that capability - self._database_available = False - if Bcfg2.settings.HAS_DJANGO: - db_settings = Bcfg2.settings.DATABASES['default'] - if ('daemon' in self.setup and 'daemon_uid' in self.setup and - self.setup['daemon'] and self.setup['daemon_uid'] and - db_settings['ENGINE'].endswith(".sqlite3") and - not os.path.exists(db_settings['NAME'])): - # syncdb will create the sqlite database, and we're - # going to daemonize, dropping privs to a non-root - # user, so we need to chown the database after - # creating it - do_chown = True - else: - do_chown = False - - from django.core.exceptions import ImproperlyConfigured - from django.core import management - try: - management.call_command("syncdb", interactive=False, - verbosity=0) - self._database_available = True - except ImproperlyConfigured: - err = sys.exc_info()[1] - self.logger.error("Django configuration problem: %s" % err) - except: - err = sys.exc_info()[1] - self.logger.error("Database update failed: %s" % err) - - if do_chown and self._database_available: - try: - os.chown(db_settings['NAME'], - self.setup['daemon_uid'], - self.setup['daemon_gid']) - except OSError: - err = sys.exc_info()[1] - self.logger.error("Failed to set ownership of database " - "at %s: %s" % (db_settings['NAME'], err)) - - #: The CA that signed the server cert - self.ca = self.setup['ca'] + Bcfg2.settings.read_config() #: The FAM :class:`threading.Thread`, #: :func:`_file_monitor_thread` self.fam_thread = \ - threading.Thread(name="%sFAMThread" % self.setup['filemonitor'], + threading.Thread(name="%sFAMThread" % + Bcfg2.Options.setup.filemonitor.__name__, target=self._file_monitor_thread) + #: The :class:`threading.Thread` that reports performance + #: statistics to syslog. self.perflog_thread = None - if self.setup['perflog']: + if Bcfg2.Options.setup.perflog: self.perflog_thread = \ threading.Thread(name="PerformanceLoggingThread", target=self._perflog_thread) @@ -263,6 +209,24 @@ class BaseCore(object): #: metadata self.metadata_cache = Cache() + #: Whether or not it's possible to use the Django database + #: backend for plugins that have that capability + self._database_available = False + if Bcfg2.settings.HAS_DJANGO: + from django.core.exceptions import ImproperlyConfigured + from django.core import management + try: + management.call_command("syncdb", interactive=False, + verbosity=0) + self._database_available = True + except ImproperlyConfigured: + err = sys.exc_info()[1] + self.logger.error("Django configuration problem: %s" % err) + except: + err = sys.exc_info()[1] + self.logger.error("Updating database %s failed: %s" % + (Bcfg2.Options.setup.db_name, err)) + def plugins_by_type(self, base_cls): """ Return a list of loaded plugins that match the passed type. @@ -288,7 +252,7 @@ class BaseCore(object): to syslog. """ self.logger.debug("Performance logging thread starting") while not self.terminate.isSet(): - self.terminate.wait(self.setup['perflog_interval']) + self.terminate.wait(Bcfg2.Options.setup.performance_interval) for name, stats in self.get_statistics(None).items(): self.logger.info("Performance statistics: " "%s min=%.06f, max=%.06f, average=%.06f, " @@ -338,10 +302,7 @@ class BaseCore(object): :attr:`Bcfg2.Server.Core.BaseCore.metadata` as side effects. This does not start plugin threads; that is done later, in :func:`Bcfg2.Server.Core.BaseCore.run` """ - while '' in self.setup['plugins']: - self.setup['plugins'].remove('') - - for plugin in self.setup['plugins']: + for plugin in Bcfg2.Options.setup.plugins: if not plugin in self.plugins: self.init_plugin(plugin) @@ -381,10 +342,6 @@ class BaseCore(object): "failed to instantiate Core") raise CoreInitError("No Metadata Plugin") - if self.debug_flag: - # enable debugging on plugins - self.plugins[plugin].set_debug(self.debug_flag) - def init_plugin(self, plugin): """ Import and instantiate a single plugin. The plugin is stored to :attr:`plugins`. @@ -395,29 +352,13 @@ class BaseCore(object): :type plugin: string :returns: None """ - self.logger.debug("Loading plugin %s" % plugin) - try: - mod = getattr(__import__("Bcfg2.Server.Plugins.%s" % - (plugin)).Server.Plugins, plugin) - except ImportError: - try: - mod = __import__(plugin, globals(), locals(), - [plugin.split('.')[-1]]) - except: - self.logger.error("Failed to load plugin %s" % plugin) - return - try: - plug = getattr(mod, plugin.split('.')[-1]) - except AttributeError: - self.logger.error("Failed to load plugin %s: %s" % - (plugin, sys.exc_info()[1])) - return + self.logger.debug("Loading plugin %s" % plugin.name) # Blacklist conflicting plugins - cplugs = [conflict for conflict in plug.conflicts + cplugs = [conflict for conflict in plugin.conflicts if conflict in self.plugins] - self.plugin_blacklist[plug.name] = cplugs + self.plugin_blacklist[plugin.name] = cplugs try: - self.plugins[plugin] = plug(self, self.datastore) + self.plugins[plugin.name] = plugin(self, self.datastore) except PluginInitError: self.logger.error("Failed to instantiate plugin %s" % plugin, exc_info=1) @@ -445,10 +386,7 @@ class BaseCore(object): """ Get the client :attr:`metadata_cache` mode. Options are off, initial, cautious, aggressive, on (synonym for cautious). See :ref:`server-caching` for more details. """ - # pylint: disable=E1103 - mode = self.setup.cfp.get("caching", "client_metadata", - default="off").lower() - # pylint: enable=E1103 + mode = Bcfg2.Options.setup.client_metadata_cache if mode == "on": return "cautious" else: @@ -632,10 +570,9 @@ class BaseCore(object): del entry.attrib['realname'] return ret except: - entry.set('name', oldname) self.logger.error("Failed binding entry %s:%s with altsrc %s" % - (entry.tag, entry.get('name'), - entry.get('altsrc'))) + (entry.tag, oldname, entry.get('name'))) + entry.set('name', oldname) self.logger.error("Falling back to %s:%s" % (entry.tag, entry.get('name'))) @@ -729,39 +666,16 @@ class BaseCore(object): return if event.code2str() == 'deleted': return - self.setup.reparse() + Bcfg2.Options.get_parser().reparse() self.metadata_cache.expire() def run(self): - """ Run the server core. This calls :func:`_daemonize`, - :func:`_run`, starts the :attr:`fam_thread`, and calls - :func:`_block`, but note that it is the responsibility of the - server core implementation to call :func:`shutdown` under - normal operation. This also handles creation of the directory - containing the pidfile, if necessary. """ - if self.setup['daemon']: - # if we're dropping privs, then the pidfile is likely - # /var/run/bcfg2-server/bcfg2-server.pid or similar. - # since some OSes clean directories out of /var/run on - # reboot, we need to ensure that the directory containing - # the pidfile exists and has the appropriate permissions - piddir = os.path.dirname(self.setup['daemon']) - if not os.path.exists(piddir): - os.makedirs(piddir) - os.chown(piddir, - self.setup['daemon_uid'], - self.setup['daemon_gid']) - os.chmod(piddir, 493) # 0775 - if not self._daemonize(): - return False - - # rewrite $HOME. pulp stores its auth creds in ~/.pulp, so - # this is necessary to make that work when privileges are - # dropped - os.environ['HOME'] = pwd.getpwuid(self.setup['daemon_uid'])[5] - else: - os.umask(int(self.setup['umask'], 8)) - + """ Run the server core. This calls :func:`_run`, starts the + :attr:`fam_thread`, and calls :func:`_block`, but note that it + is the responsibility of the server core implementation to + call :func:`shutdown` under normal operation. This also + handles creation of the directory containing the pidfile, if + necessary.""" if not self._run(): self.shutdown() return False @@ -781,20 +695,13 @@ class BaseCore(object): self.shutdown() raise - if self.setup['fam_blocking']: + if Bcfg2.Options.setup.fam_blocking: time.sleep(1) while self.fam.pending() != 0: time.sleep(1) - if self.debug_flag: - self.set_debug(None, self.debug_flag) self._block() - def _daemonize(self): - """ Daemonize the server and write the pidfile. This must be - overridden by a core implementation. """ - raise NotImplementedError - def _run(self): """ Start up the server; this method should return immediately. This must be overridden by a core @@ -852,9 +759,13 @@ class BaseCore(object): if all(ip_checks): # if all ACL plugins return True (allow), then allow + self.logger.debug("Client %s passed IP-based ACL checks for %s" % + (address[0], rmi)) return True elif False in ip_checks: # if any ACL plugin returned False (deny), then deny + self.logger.warning("Client %s failed IP-based ACL checks for %s" % + (address[0], rmi)) return False # else, no plugins returned False, but not all plugins # returned True, so some plugin returned None (defer), so @@ -862,7 +773,16 @@ class BaseCore(object): client, metadata = self.resolve_client(address) try: - return all(p.check_acl_metadata(metadata, rmi) for p in plugins) + rv = all(p.check_acl_metadata(metadata, rmi) for p in plugins) + if rv: + self.logger.debug( + "Client %s passed metadata ACL checks for %s" % + (metadata.hostname, rmi)) + else: + self.logger.warning( + "Client %s failed metadata ACL checks for %s" % + (metadata.hostname, rmi)) + return rv except: self.logger.error("Unexpected error checking ACLs for %s for %s: " "%s" % (client, rmi, sys.exc_info()[1])) @@ -1186,36 +1106,6 @@ class BaseCore(object): self.process_statistics(client, sdata) return True - def authenticate(self, cert, user, password, address): - """ Authenticate a client connection with - :func:`Bcfg2.Server.Plugin.interfaces.Metadata.AuthenticateConnection`. - - :param cert: an x509 certificate - :type cert: dict - :param user: The username of the user trying to authenticate - :type user: string - :param password: The password supplied by the client - :type password: string - :param address: An address pair of ``(<ip address>, <port>)`` - :type address: tuple - :return: bool - True if the authenticate succeeds, False otherwise - """ - if self.ca: - acert = cert - else: - # No ca, so no cert validation can be done - acert = None - return self.metadata.AuthenticateConnection(acert, user, password, - address) - - def check_acls(self, client_ip): - """ Check if client IP is in list of accepted IPs """ - try: - return self.plugins['Acl'].config.check_acl(client_ip) - except KeyError: - # No ACL means accept all incoming ips - return True - @exposed def GetDecisionList(self, address, mode): """ Get the decision list for the client with :func:`GetDecisions`. @@ -1332,3 +1222,109 @@ class BaseCore(object): address[0]) return "This method is deprecated and will be removed in a future " + \ "release\n%s" % self.fam.set_debug(debug) + + +class NetworkCore(Core): + """ A server core that actually listens on the network, can be + daemonized, etc.""" + options = Core.options + [ + Bcfg2.Options.Common.daemon, Bcfg2.Options.Common.syslog, + Bcfg2.Options.Common.location, Bcfg2.Options.Common.ssl_key, + Bcfg2.Options.Common.ssl_cert, Bcfg2.Options.Common.ssl_ca, + Bcfg2.Options.BooleanOption( + '--listen-all', cf=('server', 'listen_all'), default=False, + help="Listen on all interfaces"), + Bcfg2.Options.Option( + cf=('server', 'umask'), default='0077', help='Server umask', + type=Bcfg2.Options.Types.octal), + Bcfg2.Options.Option( + cf=('server', 'user'), default=0, dest='daemon_uid', + type=Bcfg2.Options.Types.username, + help="User to run the server daemon as"), + Bcfg2.Options.Option( + cf=('server', 'group'), default=0, dest='daemon_gid', + type=Bcfg2.Options.Types.groupname, + help="Group to run the server daemon as")] + + def __init__(self): + Core.__init__(self) + + #: The CA that signed the server cert + self.ca = Bcfg2.Options.setup.ca + + if self._database_available: + db_settings = Bcfg2.settings.DATABASES['default'] + if (Bcfg2.Options.setup.daemon and + Bcfg2.Options.setup.daemon_uid and + db_settings['ENGINE'].endswith(".sqlite3") and + not os.path.exists(db_settings['NAME'])): + # syncdb will create the sqlite database, and we're + # going to daemonize, dropping privs to a non-root + # user, so we need to chown the database after + # creating it + try: + os.chown(db_settings['NAME'], + Bcfg2.Options.setup.daemon_uid, + Bcfg2.Options.setup.daemon_gid) + except OSError: + err = sys.exc_info()[1] + self.logger.error("Failed to set ownership of database " + "at %s: %s" % (db_settings['NAME'], err)) + __init__.__doc__ = Core.__init__.__doc__.split(".. -----")[0] + \ +"\n.. automethod:: _daemonize\n" + + def run(self): + """ Run the server core. This calls :func:`_daemonize` before + calling :func:`Bcfg2.Server.Core.Core.run` to run the server + core. """ + if Bcfg2.Options.setup.daemon: + # if we're dropping privs, then the pidfile is likely + # /var/run/bcfg2-server/bcfg2-server.pid or similar. + # since some OSes clean directories out of /var/run on + # reboot, we need to ensure that the directory containing + # the pidfile exists and has the appropriate permissions + piddir = os.path.dirname(Bcfg2.Options.setup.daemon) + if not os.path.exists(piddir): + os.makedirs(piddir) + os.chown(piddir, + Bcfg2.Options.setup.daemon_uid, + Bcfg2.Options.setup.daemon_gid) + os.chmod(piddir, 493) # 0775 + if not self._daemonize(): + return False + + # rewrite $HOME. pulp stores its auth creds in ~/.pulp, so + # this is necessary to make that work when privileges are + # dropped + os.environ['HOME'] = pwd.getpwuid(self.setup['daemon_uid'])[5] + else: + os.umask(int(Bcfg2.Options.setup.umask, 8)) + + Core.run(self) + + def authenticate(self, cert, user, password, address): + """ Authenticate a client connection with + :func:`Bcfg2.Server.Plugin.interfaces.Metadata.AuthenticateConnection`. + + :param cert: an x509 certificate + :type cert: dict + :param user: The username of the user trying to authenticate + :type user: string + :param password: The password supplied by the client + :type password: string + :param address: An address pair of ``(<ip address>, <port>)`` + :type address: tuple + :return: bool - True if the authenticate succeeds, False otherwise + """ + if self.ca: + acert = cert + else: + # No ca, so no cert validation can be done + acert = None + return self.metadata.AuthenticateConnection(acert, user, password, + address) + + def _daemonize(self): + """ Daemonize the server and write the pidfile. This must be + overridden by a core implementation. """ + raise NotImplementedError diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 81fba7092..7e04b1eae 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -7,9 +7,10 @@ processes. As such, it requires Python 2.6+. import threading import lxml.etree import multiprocessing +import Bcfg2.Options from Bcfg2.Compat import Queue -from Bcfg2.Server.Core import BaseCore, exposed -from Bcfg2.Server.BuiltinCore import Core as BuiltinCore +from Bcfg2.Server.Core import Core, exposed +from Bcfg2.Server.BuiltinCore import BuiltinCore class DualEvent(object): @@ -48,7 +49,7 @@ class DualEvent(object): return self._threading_event.wait(timeout=timeout) -class ChildCore(BaseCore): +class ChildCore(Core): """ A child process for :class:`Bcfg2.MultiprocessingCore.Core`. This core builds configurations from a given :class:`multiprocessing.Pipe`. Note that this is a full-fledged @@ -67,10 +68,8 @@ class ChildCore(BaseCore): #: every ``poll_wait`` seconds. poll_wait = 5.0 - def __init__(self, setup, pipe, terminate): + def __init__(self, pipe, terminate): """ - :param setup: A Bcfg2 options dict - :type setup: Bcfg2.Options.OptionParser :param pipe: The pipe to which client hostnames are added for ChildCore objects to build configurations, and to which client configurations are added after @@ -80,7 +79,7 @@ class ChildCore(BaseCore): themselves down. :type terminate: multiprocessing.Event """ - BaseCore.__init__(self, setup) + Core.__init__(self) #: The pipe to which client hostnames are added for ChildCore #: objects to build configurations, and to which client @@ -123,7 +122,7 @@ class ChildCore(BaseCore): self.shutdown() -class Core(BuiltinCore): +class MultiprocessingCore(BuiltinCore): """ A multiprocessing core that delegates building the actual client configurations to :class:`Bcfg2.Server.MultiprocessingCore.ChildCore` objects. The @@ -131,14 +130,20 @@ class Core(BuiltinCore): :func:`GetConfig` are delegated to children. All other calls are handled by the parent process. """ + options = BuiltinCore.options + [ + Bcfg2.Options.Option( + '--children', dest="core_children", + cf=('server', 'children'), type=int, + default=multiprocessing.cpu_count(), + help='Spawn this number of children for the multiprocessing core')] + + #: How long to wait for a child process to shut down cleanly #: before it is terminated. shutdown_timeout = 10.0 - def __init__(self, setup): - BuiltinCore.__init__(self, setup) - if setup['children'] is None: - setup['children'] = multiprocessing.cpu_count() + def __init__(self): + BuiltinCore.__init__(self) #: A dict of child name -> one end of the #: :class:`multiprocessing.Pipe` object used to communicate @@ -152,7 +157,8 @@ class Core(BuiltinCore): #: when it's done. This lets us use a blocking call to #: :func:`Queue.Queue.get` when waiting for an available #: child. - self.available_children = Queue(maxsize=self.setup['children']) + self.available_children = \ + Queue(maxsize=Bcfg2.Options.setup.core_children) # sigh. multiprocessing was added in py2.6, which is when the # camelCase methods for threading objects were deprecated in @@ -165,12 +171,12 @@ class Core(BuiltinCore): self.terminate = DualEvent(threading_event=self.terminate) def _run(self): - for cnum in range(self.setup['children']): + for cnum in range(Bcfg2.Options.setup.core_children): name = "Child-%s" % cnum (mainpipe, childpipe) = multiprocessing.Pipe() self.pipes[name] = mainpipe self.logger.debug("Starting child %s" % name) - childcore = ChildCore(self.setup, childpipe, self.terminate) + childcore = ChildCore(childpipe, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) child.start() self.logger.debug("Child %s started with PID %s" % (name, diff --git a/src/lib/Bcfg2/Server/SSLServer.py b/src/lib/Bcfg2/Server/SSLServer.py index 8bdcf0500..646124fcc 100644 --- a/src/lib/Bcfg2/Server/SSLServer.py +++ b/src/lib/Bcfg2/Server/SSLServer.py @@ -15,6 +15,10 @@ from Bcfg2.Compat import xmlrpclib, SimpleXMLRPCServer, SocketServer, \ b64decode +class XMLRPCACLCheckException(Exception): + """ Raised when ACL checks fail on an RPC request """ + + class XMLRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): """ An XML-RPC dispatcher. """ @@ -33,6 +37,8 @@ class XMLRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): def _marshaled_dispatch(self, address, data): params, method = xmlrpclib.loads(data) + if not self.instance.check_acls(address, method): + raise XMLRPCACLCheckException try: if '.' not in method: params = (address, ) + params @@ -42,12 +48,12 @@ class XMLRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): response = (response.decode('utf-8'), ) else: response = (response, ) - raw_response = xmlrpclib.dumps(response, methodresponse=1, + raw_response = xmlrpclib.dumps(response, methodresponse=True, allow_none=self.allow_none, encoding=self.encoding) except xmlrpclib.Fault: fault = sys.exc_info()[1] - raw_response = xmlrpclib.dumps(fault, + raw_response = xmlrpclib.dumps(fault, methodresponse=True, allow_none=self.allow_none, encoding=self.encoding) except: @@ -56,7 +62,8 @@ class XMLRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): # report exception back to server raw_response = xmlrpclib.dumps( xmlrpclib.Fault(1, "%s:%s" % (err[0].__name__, err[1])), - allow_none=self.allow_none, encoding=self.encoding) + methodresponse=True, allow_none=self.allow_none, + encoding=self.encoding) return raw_response @@ -209,9 +216,8 @@ class XMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): password = "" cert = self.request.getpeercert() client_address = self.request.getpeername() - return (self.server.instance.authenticate(cert, username, - password, client_address) and - self.server.instance.check_acls(client_address[0])) + return self.server.instance.authenticate(cert, username, + password, client_address) def parse_request(self): """Extends parse_request. @@ -241,7 +247,7 @@ class XMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): try: select.select([self.rfile.fileno()], [], [], 3) except select.error: - print("got select timeout") + self.logger.error("Got select timeout") raise chunk_size = min(size_remaining, max_chunk_size) L.append(self.rfile.read(chunk_size).decode('utf-8')) @@ -251,7 +257,12 @@ class XMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): data) if sys.hexversion >= 0x03000000: response = response.encode('utf-8') + except XMLRPCACLCheckException: + self.send_error(401, self.responses[401][0]) + self.end_headers() except: # pylint: disable=W0702 + self.logger.error("Unexpected dispatch error for %s: %s" % + (self.client_address, sys.exc_info()[1])) try: self.send_response(500) self.end_headers() @@ -262,12 +273,7 @@ class XMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): raise else: # got a valid XML RPC response - # first, check ACLs client_address = self.request.getpeername() - method = xmlrpclib.loads(data)[1] - if not self.server.instance.check_acls(client_address, method): - self.send_error(401, self.responses[401][0]) - self.end_headers() try: self.send_response(200) self.send_header("Content-type", "text/xml") |