diff options
author | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-08-08 08:10:16 -0400 |
---|---|---|
committer | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2013-08-08 08:10:16 -0400 |
commit | e1e194a573b3803fa7f45a646bbb36b2f164a3e1 (patch) | |
tree | e9b689d1be39d38279e0a16f010e8d5e573612ef | |
parent | 35851347089db1a092ec715cb183aec19f19e983 (diff) | |
parent | eef441c1acdf1d3d483647b153f721cbab4a8517 (diff) | |
download | bcfg2-e1e194a573b3803fa7f45a646bbb36b2f164a3e1.tar.gz bcfg2-e1e194a573b3803fa7f45a646bbb36b2f164a3e1.tar.bz2 bcfg2-e1e194a573b3803fa7f45a646bbb36b2f164a3e1.zip |
Merge branch 'maint'
Conflicts:
doc/appendix/files/mysql.txt
doc/getting_started/index.txt
doc/server/plugins/structures/bundler/kernel.txt
src/lib/Bcfg2/Server/MultiprocessingCore.py
src/lib/Bcfg2/Server/Plugin/interfaces.py
src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
src/lib/Bcfg2/Server/Plugins/Probes.py
src/lib/Bcfg2/Server/Plugins/SSHbase.py
31 files changed, 601 insertions, 246 deletions
diff --git a/doc/appendix/files/mysql.txt b/doc/appendix/files/mysql.txt index a84beb3f8..0dbbe9b05 100644 --- a/doc/appendix/files/mysql.txt +++ b/doc/appendix/files/mysql.txt @@ -1,4 +1,5 @@ .. -*- mode: rst -*- +.. vim: ft=rst .. _appendix-files-mysql: @@ -17,7 +18,7 @@ I added a new bundle: <Bundle> <Path name="/root/bcfg2-install/mysql/users.sh"/> <Path name="/root/bcfg2-install/mysql/users.sql"/> - <Action name="mysql_users"/> + <Action name="users.sh"/> <Package name="mysql-server-4.1"/> <Service name="mysql"/> </Bundle> diff --git a/doc/getting_started/index.txt b/doc/getting_started/index.txt index 378c44a3a..135346e41 100644 --- a/doc/getting_started/index.txt +++ b/doc/getting_started/index.txt @@ -1,4 +1,5 @@ .. -*- mode: rst -*- +.. vim: ft=rst .. _getting_started-index: @@ -115,7 +116,7 @@ files: ``clients.xml`` and ``groups.xml``. Your current .. code-block:: xml - <Clients version="3.0"> + <Clients> <Client profile="basic" pingable="Y" pingtime="0" name="bcfg-server.example.com"/> </Clients> @@ -132,7 +133,7 @@ Our simple ``groups.xml`` file looks like: .. code-block:: xml - <Groups version='3.0'> + <Groups> <Group profile='true' public='false' name='basic'> <Group name='suse'/> </Group> @@ -205,7 +206,11 @@ real ``/etc/motd`` file to that location, run the client again, and you will find that we now have a correct entry:: Loaded tool drivers: +<<<<<<< HEAD Chkconfig POSIX Action RPM +======= + Chkconfig POSIX YUM +>>>>>>> maint Phase: initial Correct entries: 1 diff --git a/doc/server/plugins/generators/rules.txt b/doc/server/plugins/generators/rules.txt index 8e2077b50..a95d4a2a4 100644 --- a/doc/server/plugins/generators/rules.txt +++ b/doc/server/plugins/generators/rules.txt @@ -1,4 +1,5 @@ .. -*- mode: rst -*- +.. vim: ft=rst .. _server-plugins-generators-rules: @@ -41,7 +42,7 @@ Rules Tag .. xml:element:: Rules :linktotype: :noautodep: - :inlinetypes: PostInstall,RContainerType + :inlinetypes: RContainerType Package Tag ----------- diff --git a/doc/server/plugins/structures/bundler/kernel.txt b/doc/server/plugins/structures/bundler/kernel.txt index e61d21476..54f70606f 100644 --- a/doc/server/plugins/structures/bundler/kernel.txt +++ b/doc/server/plugins/structures/bundler/kernel.txt @@ -1,4 +1,5 @@ .. -*- mode: rst -*- +.. vim: ft=rst .. _server-plugins-structures-bundler-kernel: @@ -30,7 +31,7 @@ some of which might be better than this one. Feel free to hack as needed. <Path name='/boot/initrd'/> <Path name='/boot/vmlinuz.old'/> <Path name='/boot/initrd.old'/> - <Action name='lilo'/> + <BoundAction name='lilo' command='/sbin/lilo' timing='post' when='modified'/> <!-- Current kernel --> <Package name='linux-2.4.21-314.tg1'/> <Package name='linux-2.4.21-314.tg1-source'/> diff --git a/redhat/scripts/bcfg2-report-collector.init b/redhat/scripts/bcfg2-report-collector.init index 43e875a6b..3c112006d 100755 --- a/redhat/scripts/bcfg2-report-collector.init +++ b/redhat/scripts/bcfg2-report-collector.init @@ -17,7 +17,7 @@ ### END INIT INFO # Include lsb functions -. /etc//init.d/functions +. /etc/init.d/functions # Commonly used stuff DAEMON=/usr/sbin/bcfg2-report-collector @@ -25,7 +25,7 @@ PIDFILE=/var/run/bcfg2-server/bcfg2-report-collector.pid PARAMS="-D $PIDFILE" # Include default startup configuration if exists -test -f "/etc/sysconfig/bcfg2-server" && . /etc/sysconfig/bcfg2-server +test -f "/etc/sysconfig/bcfg2-report-collector" && . /etc/sysconfig/bcfg2-report-collector # Exit if $DAEMON doesn't exist and is not executable test -x $DAEMON || exit 5 diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/base.py b/src/lib/Bcfg2/Client/Tools/POSIX/base.py index 3778569a6..fb5d06e54 100644 --- a/src/lib/Bcfg2/Client/Tools/POSIX/base.py +++ b/src/lib/Bcfg2/Client/Tools/POSIX/base.py @@ -686,7 +686,7 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): """ os.makedirs helpfully creates all parent directories for us, but it sets permissions according to umask, which is probably wrong. we need to find out which directories were - created and set permissions on those + created and try to set permissions on those (http://trac.mcs.anl.gov/projects/bcfg2/ticket/1125 and http://trac.mcs.anl.gov/projects/bcfg2/ticket/1134) """ created = [] @@ -706,8 +706,9 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): (path, err)) rv = False - # set auto-created directories to mode 755, if you need - # something else, you should specify it in your config + # set auto-created directories to mode 755 and use best effort for + # permissions. If you need something else, you should specify it in + # your config. tmpentry = copy.deepcopy(entry) tmpentry.set('mode', '0755') for acl in tmpentry.findall('ACL'): @@ -715,7 +716,7 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): oct_mode(self._norm_acl_perms(acl.get('perms')) | ACL_MAP['x'])) for cpath in created: - rv &= self._set_perms(tmpentry, path=cpath) + self._set_perms(tmpentry, path=cpath) return rv diff --git a/src/lib/Bcfg2/Client/Tools/POSIXUsers.py b/src/lib/Bcfg2/Client/Tools/POSIXUsers.py index 9b40f9cea..9d7441b5c 100644 --- a/src/lib/Bcfg2/Client/Tools/POSIXUsers.py +++ b/src/lib/Bcfg2/Client/Tools/POSIXUsers.py @@ -196,7 +196,10 @@ class POSIXUsers(Bcfg2.Client.Tools.Tool): # automatically determine one -- i.e., it always # verifies continue - if val != entry.get(attr): + entval = entry.get(attr) + if not isinstance(entval, str): + entval = entval.encode('utf-8') + if val != entval: errors.append("%s for %s %s is incorrect. Current %s is " "%s, but should be %s" % (attr.title(), entry.tag, entry.get("name"), diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py index 41bf54dfb..a96ea9a3b 100644 --- a/src/lib/Bcfg2/Options.py +++ b/src/lib/Bcfg2/Options.py @@ -1198,9 +1198,9 @@ DRIVER_OPTIONS = \ yum_verify_fail_action=CLIENT_YUM_VERIFY_FAIL_ACTION, yum_verify_flags=CLIENT_YUM_VERIFY_FLAGS, posix_uid_whitelist=CLIENT_POSIX_UID_WHITELIST, - posix_gid_whitelist=CLIENT_POSIX_UID_WHITELIST, + posix_gid_whitelist=CLIENT_POSIX_GID_WHITELIST, posix_uid_blacklist=CLIENT_POSIX_UID_BLACKLIST, - posix_gid_blacklist=CLIENT_POSIX_UID_BLACKLIST) + posix_gid_blacklist=CLIENT_POSIX_GID_BLACKLIST) CLIENT_COMMON_OPTIONS = \ dict(extra=CLIENT_EXTRA_DISPLAY, diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index 63e2c2910..698703457 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -263,6 +263,20 @@ class BaseCore(object): #: metadata self.metadata_cache = Cache() + def expire_caches_by_type(self, base_cls, key=None): + """ Expire caches for all + :class:`Bcfg2.Server.Plugin.interfaces.Caching` plugins that + are instances of ``base_cls``. + + :param base_cls: The base plugin interface class to match (see + :mod:`Bcfg2.Server.Plugin.interfaces`) + :type base_cls: type + :param key: The cache key to expire + """ + for plugin in self.plugins_by_type(base_cls): + if isinstance(plugin, Bcfg2.Server.Plugin.Caching): + plugin.expire_cache(key) + def plugins_by_type(self, base_cls): """ Return a list of loaded plugins that match the passed type. @@ -289,11 +303,12 @@ class BaseCore(object): self.logger.debug("Performance logging thread starting") while not self.terminate.isSet(): self.terminate.wait(self.setup['perflog_interval']) - for name, stats in self.get_statistics(None).items(): - self.logger.info("Performance statistics: " - "%s min=%.06f, max=%.06f, average=%.06f, " - "count=%d" % ((name, ) + stats)) - self.logger.debug("Performance logging thread terminated") + if not self.terminate.isSet(): + for name, stats in self.get_statistics(None).items(): + self.logger.info("Performance statistics: " + "%s min=%.06f, max=%.06f, average=%.06f, " + "count=%d" % ((name, ) + stats)) + self.logger.info("Performance logging thread terminated") def _file_monitor_thread(self): """ The thread that runs the @@ -310,11 +325,12 @@ class BaseCore(object): else: if not self.fam.pending(): terminate.wait(15) + if self.fam.pending(): + self._update_vcs_revision() self.fam.handle_event_set(self.lock) except: continue - self._update_vcs_revision() - self.logger.debug("File monitor thread terminated") + self.logger.info("File monitor thread terminated") @Bcfg2.Server.Statistics.track_statistics() def _update_vcs_revision(self): @@ -431,14 +447,14 @@ class BaseCore(object): def shutdown(self): """ Perform plugin and FAM shutdown tasks. """ - self.logger.debug("Shutting down core...") + self.logger.info("Shutting down core...") if not self.terminate.isSet(): self.terminate.set() self.fam.shutdown() - self.logger.debug("FAM shut down") + self.logger.info("FAM shut down") for plugin in list(self.plugins.values()): plugin.shutdown() - self.logger.debug("All plugins shut down") + self.logger.info("All plugins shut down") @property def metadata_cache_mode(self): @@ -730,7 +746,7 @@ class BaseCore(object): if event.code2str() == 'deleted': return self.setup.reparse() - self.metadata_cache.expire() + self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) def block_for_fam_events(self, handle_events=False): """ Block until all fam events have been handleed, optionally @@ -920,6 +936,9 @@ class BaseCore(object): imd.query.by_name = self.build_metadata if self.metadata_cache_mode in ['cautious', 'aggressive']: self.metadata_cache[client_name] = imd + else: + self.logger.debug("Using cached metadata object for %s" % + client_name) return imd def process_statistics(self, client_name, statistics): @@ -947,6 +966,7 @@ class BaseCore(object): state.get('state'))) self.client_run_hook("end_statistics", meta) + @track_statistics() def resolve_client(self, address, cleanup_cache=False, metadata=True): """ Given a client address, get the client hostname and optionally metadata. @@ -1002,12 +1022,10 @@ class BaseCore(object): def _get_rmi(self): """ Get a list of RMI calls exposed by plugins """ rmi = dict() - for pname, pinst in list(self.plugins.items()): + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: for mname in pinst.__rmi__: rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname) - famname = self.fam.__class__.__name__ - for mname in self.fam.__rmi__: - rmi["%s.%s" % (famname, mname)] = getattr(self.fam, mname) return rmi def _resolve_exposed_method(self, method_name): @@ -1100,6 +1118,7 @@ class BaseCore(object): for plugin in self.plugins_by_type(Probing): for probe in plugin.GetProbes(metadata): resp.append(probe) + self.logger.debug("Sending probe list to %s" % client) return lxml.etree.tostring(resp, xml_declaration=False).decode('UTF-8') except: @@ -1125,7 +1144,7 @@ class BaseCore(object): # that's created for RecvProbeData doesn't get cached. # I.e., the next metadata object that's built, after probe # data is processed, is cached. - self.metadata_cache.expire(client) + self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) try: xpdata = lxml.etree.XML(probedata.encode('utf-8'), parser=Bcfg2.Server.XMLParser) diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index b9716619d..e79207291 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -3,66 +3,134 @@ :mod:`multiprocessing` library to offload work to multiple child processes. As such, it requires Python 2.6+. -The parent communicates with the children over two constructs: - -* A :class:`multiprocessing.Pipe` is used to process render requests. - The pipe is locked when in use (i.e., between the time that a client - is submitted to be rendered and the time that its configuration is - returned) to keep things thread-safe. (This is accomplished through - the use of - :attr:`Bcfg2.Server.MultiprocessingCore.available_children.) -* A :class:`multiprocessing.Queue` is used to submit other commands in - a thread-safe, non-blocking fashion. (Note that, since it is a - queue, no results can be returned.) It implements a very simple RPC - protocol. Each command passed to a child over the Pipe must be a - tuple with the format:: - - (<method>, <args>, <kwargs>) - - The method must be exposed by the child by decorating it with - :func:`Bcfg2.Server.Core.exposed`. +The parent communicates with the children over +:class:`multiprocessing.Queue` objects via a +:class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object. + +A method being called via the RPCQueue must be exposed by the child by +decorating it with :func:`Bcfg2.Server.Core.exposed`. """ +import time import threading import lxml.etree import multiprocessing -from Bcfg2.Compat import Queue -from Bcfg2.Server.Cache import Cache +import Bcfg2.Server.Plugin +from itertools import cycle +from Bcfg2.Cache import Cache +from Bcfg2.Compat import Queue, Empty, wraps from Bcfg2.Server.Core import BaseCore, exposed -from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.BuiltinCore import Core as BuiltinCore +from multiprocessing.connection import Listener, Client -class DispatchingCache(Cache, Debuggable): +class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable): """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates cache expiration events to child nodes. """ #: The method to send over the pipe to expire the cache - method = "expire_cache" + method = "expire_metadata_cache" def __init__(self, *args, **kwargs): - #: A dict of <child name>: :class:`multiprocessing.Queue` - #: objects that should be given a cache expiration command any - #: time an item is expired. - self.command_queues = kwargs.pop("pipes", dict()) - - Debuggable.__init__(self) + self.rpc_q = kwargs.pop("queue") + Bcfg2.Server.Plugin.Debuggable.__init__(self) Cache.__init__(self, *args, **kwargs) def expire(self, key=None): - if (key and key in self) or (not key and len(self)): - # dispatching cache expiration to children can be - # expensive, so only do it if there's something to expire - for child, cmd_q in self.command_queues.items(): - if key: - self.logger.debug("Expiring metadata cache for %s on %s" % - (key, child)) - else: - self.logger.debug("Expiring metadata cache on %s" % child) - cmd_q.put((self.method, [key], dict())) + self.rpc_q.publish(self.method, args=[key]) Cache.expire(self, key=key) +class RPCQueue(Bcfg2.Server.Plugin.Debuggable): + """ An implementation of a :class:`multiprocessing.Queue` designed + for several additional use patterns: + + * Random-access reads, based on a key that identifies the data; + * Publish-subscribe, where a datum is sent to all hosts. + + The subscribers can deal with this as a normal Queue with no + special handling. + """ + poll_wait = 3.0 + + def __init__(self): + Bcfg2.Server.Plugin.Debuggable.__init__(self) + self._terminate = threading.Event() + self._queues = dict() + self._available_listeners = Queue() + self._blocking_listeners = [] + + def add_subscriber(self, name): + """ Add a subscriber to the queue. This returns the + :class:`multiprocessing.Queue` object that the subscriber + should read from. """ + self._queues[name] = multiprocessing.Queue() + return self._queues[name] + + def publish(self, method, args=None, kwargs=None): + """ Publish an RPC call to the queue for consumption by all + subscribers. """ + for queue in self._queues.values(): + queue.put((None, (method, args or [], kwargs or dict()))) + + def rpc(self, dest, method, args=None, kwargs=None): + """ Make an RPC call to the named subscriber, expecting a + response. This opens a + :class:`multiprocessing.connection.Listener` and passes the + Listener address to the child as part of the RPC call, so that + the child can connect to the Listener to submit its results. + + Listeners are reused when possible to minimize overhead. + """ + try: + listener = self._available_listeners.get_nowait() + self.logger.debug("Reusing existing RPC listener at %s" % + listener.address) + except Empty: + listener = Listener() + self.logger.debug("Created new RPC listener at %s" % + listener.address) + self._blocking_listeners.append(listener) + try: + self._queues[dest].put((listener.address, + (method, args or [], kwargs or dict()))) + conn = listener.accept() + self._blocking_listeners.remove(listener) + try: + while not self._terminate.is_set(): + if conn.poll(self.poll_wait): + return conn.recv() + finally: + conn.close() + finally: + self._available_listeners.put(listener) + + def close(self): + """ Close queues and connections. """ + self._terminate.set() + self.logger.debug("Closing RPC queues") + for name, queue in self._queues.items(): + self.logger.debug("Closing RPC queue to %s" % name) + queue.close() + + # close any listeners that are waiting for connections + self.logger.debug("Closing RPC connections") + for listener in self._blocking_listeners: + self.logger.debug("Closing RPC connection at %s" % + listener.address) + listener.close() + + self.logger.debug("Closing RPC listeners") + try: + while True: + listener = self._available_listeners.get_nowait() + self.logger.debug("Closing RPC listener at %s" % + listener.address) + listener.close() + except Empty: + pass + + class DualEvent(object): """ DualEvent is a clone of :class:`threading.Event` that internally implements both :class:`threading.Event` and @@ -111,101 +179,154 @@ class ChildCore(BaseCore): those, though, if the pipe communication "protocol" were made more robust. """ - #: How long to wait while polling for new clients to build. This - #: doesn't affect the speed with which a client is built, but + #: How long to wait while polling for new RPC commands. This + #: doesn't affect the speed with which a command is processed, but #: setting it too high will result in longer shutdown times, since #: we only check for the termination event from the main process #: every ``poll_wait`` seconds. - poll_wait = 5.0 + poll_wait = 3.0 - def __init__(self, setup, render_pipe, command_queue, terminate): + def __init__(self, name, setup, rpc_q, terminate): """ + :param name: The name of this child + :type name: string :param setup: A Bcfg2 options dict :type setup: Bcfg2.Options.OptionParser - :param pipe: The pipe to which client hostnames are added for - ChildCore objects to build configurations, and to - which client configurations are added after - having been built by ChildCore objects. - :type pipe: multiprocessing.Pipe + :param read_q: The queue the child will read from for RPC + communications from the parent process. + :type read_q: multiprocessing.Queue + :param write_q: The queue the child will write the results of + RPC calls to. + :type write_q: multiprocessing.Queue :param terminate: An event that flags ChildCore objects to shut themselves down. :type terminate: multiprocessing.Event """ BaseCore.__init__(self, setup) - #: The pipe to which client hostnames are added for ChildCore - #: objects to build configurations, and to which client - #: configurations are added after having been built by - #: ChildCore objects. - self.render_pipe = render_pipe - - #: The queue from which other commands are received - self.command_queue = command_queue + #: The name of this child + self.name = name #: The :class:`multiprocessing.Event` that will be monitored #: to determine when this child should shut down. self.terminate = terminate - #: The :class:`threading.Thread` used to process commands - #: received via the :class:`multiprocessing.Queue` RPC - #: interface - self.command_thread = \ - threading.Thread(name="CommandThread", - target=self._command_queue_thread) + #: The queue used for RPC communication + self.rpc_q = rpc_q - def _daemonize(self): - return True + # override this setting so that the child doesn't try to write + # the pidfile + self.setup['daemon'] = False + + # ensure that the child doesn't start a perflog thread + self.perflog_thread = None + + self._rmi = dict() def _run(self): - try: - self.command_thread.start() - except: - self.shutdown() - raise return True - def render(self): - """ Process client configuration render requests """ - if self.render_pipe.poll(self.poll_wait): - if not self.metadata.use_database: - # handle FAM events, in case (for instance) the - # client has just been added to clients.xml, or a - # profile has just been asserted. but really, you - # should be using the metadata database if you're - # using this core. - self.fam.handle_events_in_interval(0.1) - client = self.render_pipe.recv() - self.logger.debug("Building configuration for %s" % client) - self.render_pipe.send( - lxml.etree.tostring(self.BuildConfiguration(client))) + def _daemonize(self): + return True + + def _dispatch(self, address, data): + """ Method dispatcher used for commands received from + the RPC queue. """ + if address is not None: + # if the key is None, then no response is expected. we + # make the return connection before dispatching the actual + # RPC call so that the parent is blocking for a connection + # as briefly as possible + self.logger.debug("Connecting to parent via %s" % address) + client = Client(address) + method, args, kwargs = data + func = None + rv = None + if "." in method: + if method in self._rmi: + func = self._rmi[method] + else: + self.logger.error("%s: Method %s does not exist" % (self.name, + method)) + elif not hasattr(self, method): + self.logger.error("%s: Method %s does not exist" % (self.name, + method)) + else: # method is not a plugin RMI, and exists + func = getattr(self, method) + if not func.exposed: + self.logger.error("%s: Method %s is not exposed" % (self.name, + method)) + func = None + if func is not None: + self.logger.debug("%s: Calling RPC method %s" % (self.name, + method)) + rv = func(*args, **kwargs) + if address is not None: + # if the key is None, then no response is expected + self.logger.debug("Returning data to parent via %s" % address) + client.send(rv) def _block(self): - while not self.terminate.isSet(): + self._rmi = self._get_rmi() + while not self.terminate.is_set(): try: - self.render() + address, data = self.rpc_q.get(timeout=self.poll_wait) + threadname = "-".join(str(i) for i in data) + rpc_thread = threading.Thread(name=threadname, + target=self._dispatch, + args=[address, data]) + rpc_thread.start() + except Empty: + pass except KeyboardInterrupt: break self.shutdown() - def _command_queue_thread(self): - """ Process commands received on the command queue thread """ - while not self.terminate.isSet(): - method, args, kwargs = self.command_queue.get() - if hasattr(self, method): - func = getattr(self, method) - if func.exposed: - self.logger.debug("Child calling RPC method %s" % method) - func(*args, **kwargs) + def shutdown(self): + BaseCore.shutdown(self) + self.logger.info("%s: Closing RPC command queue" % self.name) + self.rpc_q.close() + + while len(threading.enumerate()) > 1: + threads = [t for t in threading.enumerate() + if t != threading.current_thread()] + self.logger.info("%s: Waiting for %d thread(s): %s" % + (self.name, len(threads), + [t.name for t in threads])) + time.sleep(1) + self.logger.info("%s: All threads stopped" % self.name) + + def _get_rmi(self): + rmi = dict() + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: + for crmi in pinst.__child_rmi__: + if isinstance(crmi, tuple): + mname = crmi[1] else: - self.logger.error("Method %s is not exposed" % method) - else: - self.logger.error("Method %s does not exist" % method) + mname = crmi + rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname) + return rmi @exposed - def expire_cache(self, client=None): + def expire_metadata_cache(self, client=None): """ Expire the metadata cache for a client """ self.metadata_cache.expire(client) + @exposed + def RecvProbeData(self, address, _): + """ Expire the probe cache for a client """ + self.expire_caches_by_type(Bcfg2.Server.Plugin.Probing, + key=self.resolve_client(address, + metadata=False)[0]) + + @exposed + def GetConfig(self, client): + """ Render the configuration for a client """ + self.logger.debug("%s: Building configuration for %s" % + (self.name, client)) + return lxml.etree.tostring(self.BuildConfiguration(client)) + class Core(BuiltinCore): """ A multiprocessing core that delegates building the actual @@ -224,84 +345,162 @@ class Core(BuiltinCore): if setup['children'] is None: setup['children'] = multiprocessing.cpu_count() - #: A dict of child name -> one end of the - #: :class:`multiprocessing.Pipe` object used to submit render - #: requests to that child. (The child is given the other end - #: of the Pipe.) - self.render_pipes = dict() - - #: A dict of child name -> :class:`multiprocessing.Queue` - #: object used to pass commands to that child. - self.command_queues = dict() - - #: A queue that keeps track of which children are available to - #: render a configuration. A child is popped from the queue - #: when it starts to render a config, then it's pushed back on - #: when it's done. This lets us use a blocking call to - #: :func:`Queue.Queue.get` when waiting for an available - #: child. - self.available_children = Queue(maxsize=self.setup['children']) - - # sigh. multiprocessing was added in py2.6, which is when the - # camelCase methods for threading objects were deprecated in - # favor of the Pythonic under_score methods. So - # multiprocessing.Event *only* has is_set(), while - # threading.Event has *both* isSet() and is_set(). In order - # to make the core work with Python 2.4+, and with both - # multiprocessing and threading Event objects, we just - # monkeypatch self.terminate to have isSet(). + #: The flag that indicates when to stop child threads and + #: processes self.terminate = DualEvent(threading_event=self.terminate) - self.metadata_cache = DispatchingCache() + #: A :class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object + #: used to send or publish commands to children. + self.rpc_q = RPCQueue() + + self.metadata_cache = DispatchingCache(queue=self.rpc_q) + + #: A list of children that will be cycled through + self._all_children = [] + + #: An iterator that each child will be taken from in sequence, + #: to provide a round-robin distribution of render requests + self.children = None def _run(self): for cnum in range(self.setup['children']): name = "Child-%s" % cnum - # create Pipe for render requests and results - (mainpipe, childpipe) = multiprocessing.Pipe() - self.render_pipes[name] = mainpipe - - # create Queue for other commands - cmd_q = multiprocessing.Queue() - self.command_queues[name] = cmd_q - self.metadata_cache.command_queues[name] = cmd_q - self.logger.debug("Starting child %s" % name) - childcore = ChildCore(self.setup, childpipe, cmd_q, self.terminate) + child_q = self.rpc_q.add_subscriber(name) + childcore = ChildCore(name, self.setup, child_q, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) child.start() self.logger.debug("Child %s started with PID %s" % (name, child.pid)) - self.available_children.put(name) + self._all_children.append(name) + self.logger.debug("Started %s children: %s" % (len(self._all_children), + self._all_children)) + self.children = cycle(self._all_children) return BuiltinCore._run(self) def shutdown(self): BuiltinCore.shutdown(self) - for child in multiprocessing.active_children(): - self.logger.debug("Shutting down child %s" % child.name) - child.join(self.shutdown_timeout) - if child.is_alive(): + self.logger.info("Closing RPC command queues") + self.rpc_q.close() + + def term_children(): + """ Terminate all remaining multiprocessing children. """ + for child in multiprocessing.active_children(): self.logger.error("Waited %s seconds to shut down %s, " "terminating" % (self.shutdown_timeout, child.name)) child.terminate() - else: - self.logger.debug("Child %s shut down" % child.name) - self.logger.debug("All children shut down") + + timer = threading.Timer(self.shutdown_timeout, term_children) + timer.start() + while len(multiprocessing.active_children()): + self.logger.info("Waiting for %s child(ren): %s" % + (len(multiprocessing.active_children()), + [c.name + for c in multiprocessing.active_children()])) + time.sleep(1) + timer.cancel() + self.logger.info("All children shut down") + + while len(threading.enumerate()) > 1: + threads = [t for t in threading.enumerate() + if t != threading.current_thread()] + self.logger.info("Waiting for %s thread(s): %s" % + (len(threads), [t.name for t in threads])) + time.sleep(1) + self.logger.info("Shutdown complete") + + def _get_rmi(self): + child_rmi = dict() + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: + for crmi in pinst.__child_rmi__: + if isinstance(crmi, tuple): + parentname, childname = crmi + else: + parentname = childname = crmi + child_rmi["%s.%s" % (pname, parentname)] = \ + "%s.%s" % (pname, childname) + + rmi = BuiltinCore._get_rmi(self) + for method in rmi.keys(): + if method in child_rmi: + rmi[method] = self._child_rmi_wrapper(method, + rmi[method], + child_rmi[method]) + return rmi + + def _child_rmi_wrapper(self, method, parent_rmi, child_rmi): + """ Returns a callable that dispatches a call to the given + child RMI to child processes, and calls the parent RMI locally + (i.e., in the parent process). """ + @wraps(parent_rmi) + def inner(*args, **kwargs): + self.logger.debug("Dispatching RMI call to %s to children: %s" % + (method, child_rmi)) + self.rpc_q.publish(child_rmi, args=args, kwargs=kwargs) + return parent_rmi(*args, **kwargs) + + return inner @exposed def set_debug(self, address, debug): + self.rpc_q.set_debug(debug) + self.rpc_q.publish("set_debug", args=[address, debug]) self.metadata_cache.set_debug(debug) return BuiltinCore.set_debug(self, address, debug) @exposed + def RecvProbeData(self, address, probedata): + rv = BuiltinCore.RecvProbeData(self, address, probedata) + # we don't want the children to actually process probe data, + # so we don't send the data, just the fact that we got some. + self.rpc_q.publish("RecvProbeData", args=[address, None]) + return rv + + @exposed def GetConfig(self, address): client = self.resolve_client(address)[0] - childname = self.available_children.get() - self.logger.debug("Building configuration on child %s" % childname) - pipe = self.render_pipes[childname] - pipe.send(client) - config = pipe.recv() - self.available_children.put_nowait(childname) - return config + childname = self.children.next() + self.logger.debug("Building configuration for %s on %s" % (client, + childname)) + return self.rpc_q.rpc(childname, "GetConfig", args=[client]) + + @exposed + def get_statistics(self, address): + stats = dict() + + def _aggregate_statistics(newstats, prefix=None): + """ Aggregate a set of statistics from a child or parent + server core. This adds the statistics to the overall + statistics dict (optionally prepending a prefix, such as + "Child-1", to uniquely identify this set of statistics), + and aggregates it with the set of running totals that are + kept from all cores. """ + for statname, vals in newstats.items(): + if statname.startswith("ChildCore:"): + statname = statname[5:] + if prefix: + prettyname = "%s:%s" % (prefix, statname) + else: + prettyname = statname + stats[prettyname] = vals + totalname = "Total:%s" % statname + if totalname not in stats: + stats[totalname] = vals + else: + newmin = min(stats[totalname][0], vals[0]) + newmax = max(stats[totalname][1], vals[1]) + newcount = stats[totalname][3] + vals[3] + newmean = ((stats[totalname][2] * stats[totalname][3]) + + (vals[2] * vals[3])) / newcount + stats[totalname] = (newmin, newmax, newmean, newcount) + + stats = dict() + for childname in self._all_children: + _aggregate_statistics( + self.rpc_q.rpc(childname, "get_statistics", args=[address]), + prefix=childname) + _aggregate_statistics(BuiltinCore.get_statistics(self, address)) + return stats diff --git a/src/lib/Bcfg2/Server/Plugin/base.py b/src/lib/Bcfg2/Server/Plugin/base.py index c825a57b5..03feceb6f 100644 --- a/src/lib/Bcfg2/Server/Plugin/base.py +++ b/src/lib/Bcfg2/Server/Plugin/base.py @@ -12,6 +12,10 @@ class Debuggable(object): #: List of names of methods to be exposed as XML-RPC functions __rmi__ = ['toggle_debug', 'set_debug'] + #: How exposed XML-RPC functions should be dispatched to child + #: processes. + __child_rmi__ = __rmi__[:] + def __init__(self, name=None): """ :param name: The name of the logger object to get. If none is @@ -34,9 +38,6 @@ class Debuggable(object): :returns: bool - The new value of the debug flag """ self.debug_flag = debug - self.debug_log("%s: debug = %s" % (self.__class__.__name__, - self.debug_flag), - flag=True) return debug def toggle_debug(self): @@ -94,6 +95,20 @@ class Plugin(Debuggable): #: List of names of methods to be exposed as XML-RPC functions __rmi__ = Debuggable.__rmi__ + #: How exposed XML-RPC functions should be dispatched to child + #: processes, if :mod:`Bcfg2.Server.MultiprocessingCore` is in + #: use. Items ``__child_rmi__`` can either be strings (in which + #: case the same function is called on child processes as on the + #: parent) or 2-tuples, in which case the first element is the + #: name of the RPC function called on the parent process, and the + #: second element is the name of the function to call on child + #: processes. Functions that are not listed in the list will not + #: be dispatched to child processes, i.e., they will only be + #: called on the parent. A function must be listed in ``__rmi__`` + #: in order to be exposed; functions listed in ``_child_rmi__`` + #: but not ``__rmi__`` will be ignored. + __child_rmi__ = Debuggable.__child_rmi__ + def __init__(self, core, datastore): """ :param core: The Bcfg2.Server.Core initializing the plugin @@ -136,6 +151,8 @@ class Plugin(Debuggable): self.running = False def set_debug(self, debug): + self.debug_log("%s: debug = %s" % (self.name, self.debug_flag), + flag=True) for entry in self.Entries.values(): if isinstance(entry, Debuggable): entry.set_debug(debug) diff --git a/src/lib/Bcfg2/Server/Plugin/interfaces.py b/src/lib/Bcfg2/Server/Plugin/interfaces.py index 7909eaa03..ed4afb9b2 100644 --- a/src/lib/Bcfg2/Server/Plugin/interfaces.py +++ b/src/lib/Bcfg2/Server/Plugin/interfaces.py @@ -630,3 +630,22 @@ class ClientACLs(object): :returns: bool """ return True + + +class Caching(object): + """ A plugin that caches more than just the data received from the + FAM. This presents a unified interface to clear the cache. """ + + def expire_cache(self, key=None): + """ Expire the cache associated with the given key. + + :param key: The key to expire the cache for. Because cache + implementations vary tremendously between plugins, + this could be any number of things, but generally + a hostname. It also may or may not be possible to + expire the cache for a single host; this interface + does not require any guarantee about that. + :type key: varies + :returns: None + """ + raise NotImplementedError diff --git a/src/lib/Bcfg2/Server/Plugins/Guppy.py b/src/lib/Bcfg2/Server/Plugins/Guppy.py index 6d6df3cc3..c5969f978 100644 --- a/src/lib/Bcfg2/Server/Plugins/Guppy.py +++ b/src/lib/Bcfg2/Server/Plugins/Guppy.py @@ -34,6 +34,7 @@ class Guppy(Bcfg2.Server.Plugin.Plugin): """Guppy is a debugging plugin to help trace memory leaks""" __author__ = 'bcfg-dev@mcs.anl.gov' __rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Enable', 'Disable'] + __child_rmi__ = __rmi__[:] def __init__(self, core, datastore): Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore) diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py index f37e0d97c..f355fd7de 100644 --- a/src/lib/Bcfg2/Server/Plugins/Metadata.py +++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py @@ -486,6 +486,7 @@ class MetadataGroup(tuple): # pylint: disable=E0012,R0924 class Metadata(Bcfg2.Server.Plugin.Metadata, + Bcfg2.Server.Plugin.Caching, Bcfg2.Server.Plugin.ClientRunHooks, Bcfg2.Server.Plugin.DatabaseBacked): """This class contains data for bcfg2 server metadata.""" @@ -494,6 +495,7 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, def __init__(self, core, datastore, watch_clients=True): Bcfg2.Server.Plugin.Metadata.__init__(self) + Bcfg2.Server.Plugin.Caching.__init__(self) Bcfg2.Server.Plugin.ClientRunHooks.__init__(self) Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore) self.watch_clients = watch_clients @@ -934,13 +936,16 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, self.groups[gname] self.states['groups.xml'] = True + def expire_cache(self, key=None): + self.core.metadata_cache.expire(key) + def HandleEvent(self, event): """Handle update events for data files.""" for handles, event_handler in self.handlers.items(): if handles(event): # clear the entire cache when we get an event for any # metadata file - self.core.metadata_cache.expire() + self.expire_cache() event_handler(event) if False not in list(self.states.values()) and self.debug_flag: @@ -978,17 +983,21 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, self.logger.error(msg) raise Bcfg2.Server.Plugin.PluginExecutionError(msg) - profiles = [g for g in self.clientgroups[client] - if g in self.groups and self.groups[g].is_profile] - self.logger.info("Changing %s profile from %s to %s" % - (client, profiles, profile)) - self.update_client(client, dict(profile=profile)) - if client in self.clientgroups: - for prof in profiles: - self.clientgroups[client].remove(prof) - self.clientgroups[client].append(profile) + metadata = self.core.build_metadata(client) + if metadata.profile != profile: + self.logger.info("Changing %s profile from %s to %s" % + (client, metadata.profile, profile)) + self.update_client(client, dict(profile=profile)) + if client in self.clientgroups: + if metadata.profile in self.clientgroups[client]: + self.clientgroups[client].remove(metadata.profile) + self.clientgroups[client].append(profile) + else: + self.clientgroups[client] = [profile] else: - self.clientgroups[client] = [profile] + self.logger.debug( + "Ignoring %s request to change profile from %s to %s" + % (client, metadata.profile, profile)) else: self.logger.info("Creating new client: %s, profile %s" % (client, profile)) @@ -1004,8 +1013,8 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, self.add_client(client, dict(profile=profile)) self.clients.append(client) self.clientgroups[client] = [profile] - if not self._use_db: - self.clients_xml.write() + if not self._use_db: + self.clients_xml.write() def set_version(self, client, version): """Set version for provided client.""" diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py index aa6127f57..9ff2d53a0 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py @@ -85,13 +85,12 @@ class PackagesSources(Bcfg2.Server.Plugin.StructFile, :type event: Bcfg2.Server.FileMonitor.Event :returns: None """ - Bcfg2.Server.Plugin.StructFile.HandleEvent(self, event=event) if event and event.filename != self.name: for fpath in self.extras: if fpath == os.path.abspath(event.filename): self.parsed.add(fpath) break - + Bcfg2.Server.Plugin.StructFile.HandleEvent(self, event=event) if self.loaded: self.logger.info("Reloading Packages plugin") self.pkg_obj.Reload() @@ -108,10 +107,11 @@ class PackagesSources(Bcfg2.Server.Plugin.StructFile, def Index(self): Bcfg2.Server.Plugin.StructFile.Index(self) self.entries = [] - for xsource in self.xdata.findall('.//Source'): - source = self.source_from_xml(xsource) - if source is not None: - self.entries.append(source) + if self.loaded: + for xsource in self.xdata.findall('.//Source'): + source = self.source_from_xml(xsource) + if source is not None: + self.entries.append(source) Index.__doc__ = Bcfg2.Server.Plugin.StructFile.Index.__doc__ + """ ``Index`` is responsible for calling :func:`source_from_xml` diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py index 453198ac8..75dab3f76 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py @@ -265,6 +265,8 @@ class YumCollection(Collection): .. private-include: _add_gpg_instances, _get_pulp_consumer """ + _helper = None + #: Options that are included in the [packages:yum] section of the #: config but that should not be included in the temporary #: yum.conf we write out @@ -282,7 +284,6 @@ class YumCollection(Collection): #: external commands self.cmd = Executor() - self._helper = None if self.use_yum: #: Define a unique cache file for this collection to use #: for cached yum metadata @@ -320,7 +321,7 @@ class YumCollection(Collection): self.logger.error("Could not create Pulp consumer " "cert directory at %s: %s" % (certdir, err)) - self.pulp_cert_set = PulpCertificateSet(certdir) + self.__class__.pulp_cert_set = PulpCertificateSet(certdir) @property def disableMetaData(self): @@ -355,16 +356,19 @@ class YumCollection(Collection): forking, but apparently not); finally we check in /usr/sbin, the default location. """ if not self._helper: + # pylint: disable=W0212 try: - self._helper = self.setup.cfp.get("packages:yum", "helper") + self.__class__._helper = self.setup.cfp.get("packages:yum", + "helper") except (ConfigParser.NoOptionError, ConfigParser.NoSectionError): # first see if bcfg2-yum-helper is in PATH try: self.debug_log("Checking for bcfg2-yum-helper in $PATH") self.cmd.run(['bcfg2-yum-helper']) - self._helper = 'bcfg2-yum-helper' + self.__class__._helper = 'bcfg2-yum-helper' except OSError: - self._helper = "/usr/sbin/bcfg2-yum-helper" + self.__class__._helper = "/usr/sbin/bcfg2-yum-helper" + # pylint: enable=W0212 return self._helper @property @@ -945,9 +949,14 @@ class YumCollection(Collection): try: return json.loads(result.stdout) except ValueError: - err = sys.exc_info()[1] - self.logger.error("Packages: error reading bcfg2-yum-helper " - "output: %s" % err) + if result.stdout: + err = sys.exc_info()[1] + self.logger.error("Packages: Error reading bcfg2-yum-helper " + "output: %s" % err) + self.logger.error("Packages: bcfg2-yum-helper output: %s" % + result.stdout) + else: + self.logger.error("Packages: No bcfg2-yum-helper output") raise def setup_data(self, force_update=False): diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py index 03047b046..20a75c678 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py @@ -71,6 +71,7 @@ class OnDemandDict(MutableMapping): class Packages(Bcfg2.Server.Plugin.Plugin, + Bcfg2.Server.Plugin.Caching, Bcfg2.Server.Plugin.StructureValidator, Bcfg2.Server.Plugin.Generator, Bcfg2.Server.Plugin.Connector, @@ -93,8 +94,12 @@ class Packages(Bcfg2.Server.Plugin.Plugin, #: and :func:`Reload` __rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Refresh', 'Reload'] + __child_rmi__ = Bcfg2.Server.Plugin.Plugin.__child_rmi__ + \ + [('Refresh', 'expire_cache'), ('Reload', 'expire_cache')] + def __init__(self, core, datastore): Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore) + Bcfg2.Server.Plugin.Caching.__init__(self) Bcfg2.Server.Plugin.StructureValidator.__init__(self) Bcfg2.Server.Plugin.Generator.__init__(self) Bcfg2.Server.Plugin.Connector.__init__(self) @@ -149,8 +154,21 @@ class Packages(Bcfg2.Server.Plugin.Plugin, #: object when one is requested, so each entry is very #: short-lived -- it's purged at the end of each client run. self.clients = dict() - # pylint: enable=C0301 + #: groupcache caches group lookups. It maps Collections (via + #: :attr:`Bcfg2.Server.Plugins.Packages.Collection.Collection.cachekey`) + #: to sets of package groups, and thence to the packages + #: indicated by those groups. + self.groupcache = dict() + + #: pkgcache caches complete package sets. It maps Collections + #: (via + #: :attr:`Bcfg2.Server.Plugins.Packages.Collection.Collection.cachekey`) + #: to sets of initial packages, and thence to the final + #: (complete) package selections resolved from the initial + #: packages + self.pkgcache = dict() + # pylint: enable=C0301 __init__.__doc__ = Bcfg2.Server.Plugin.Plugin.__init__.__doc__ def set_debug(self, debug): @@ -388,14 +406,24 @@ class Packages(Bcfg2.Server.Plugin.Plugin, for el in to_remove: el.getparent().remove(el) - gpkgs = collection.get_groups(groups) - for pkgs in gpkgs.values(): + groups.sort() + # check for this set of groups in the group cache + gkey = hash(tuple(groups)) + if gkey not in self.groupcache[collection.cachekey]: + self.groupcache[collection.cachekey][gkey] = \ + collection.get_groups(groups) + for pkgs in self.groupcache[collection.cachekey][gkey].values(): base.update(pkgs) # essential pkgs are those marked as such by the distribution base.update(collection.get_essential()) - packages, unknown = collection.complete(base) + # check for this set of packages in the package cache + pkey = hash(tuple(base)) + if pkey not in self.pkgcache[collection.cachekey]: + self.pkgcache[collection.cachekey][pkey] = \ + collection.complete(base) + packages, unknown = self.pkgcache[collection.cachekey][pkey] if unknown: self.logger.info("Packages: Got %d unknown entries" % len(unknown)) self.logger.info("Packages: %s" % list(unknown)) @@ -421,6 +449,9 @@ class Packages(Bcfg2.Server.Plugin.Plugin, self._load_config() return True + def expire_cache(self, _=None): + self.Reload() + def _load_config(self, force_update=False): """ Load the configuration data and setup sources @@ -448,9 +479,11 @@ class Packages(Bcfg2.Server.Plugin.Plugin, if not self.disableMetaData: collection.setup_data(force_update) - # clear Collection caches + # clear Collection and package caches self.clients = dict() self.collections = dict() + self.groupcache = dict() + self.pkgcache = dict() for source in self.sources.entries: cachefiles.add(source.cachefile) @@ -563,6 +596,8 @@ class Packages(Bcfg2.Server.Plugin.Plugin, if cclass != Collection: self.clients[metadata.hostname] = ckey self.collections[ckey] = collection + self.groupcache.setdefault(ckey, dict()) + self.pkgcache.setdefault(ckey, dict()) return collection def get_additional_data(self, metadata): diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py index c6cf920df..7b85f180d 100644 --- a/src/lib/Bcfg2/Server/Plugins/Probes.py +++ b/src/lib/Bcfg2/Server/Plugins/Probes.py @@ -183,14 +183,16 @@ class ProbeSet(Bcfg2.Server.Plugin.EntrySet): class Probes(Bcfg2.Server.Plugin.Probing, + Bcfg2.Server.Plugin.Caching, Bcfg2.Server.Plugin.Connector, Bcfg2.Server.Plugin.DatabaseBacked): """ A plugin to gather information from a client machine """ __author__ = 'bcfg-dev@mcs.anl.gov' def __init__(self, core, datastore): - Bcfg2.Server.Plugin.Connector.__init__(self) Bcfg2.Server.Plugin.Probing.__init__(self) + Bcfg2.Server.Plugin.Caching.__init__(self) + Bcfg2.Server.Plugin.Connector.__init__(self) Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore) try: @@ -268,12 +270,17 @@ class Probes(Bcfg2.Server.Plugin.Probing, hostname=client.hostname).exclude( group__in=self.cgroups[client.hostname]).delete() - def load_data(self): + def expire_cache(self, key=None): + self.load_data(client=key) + + def load_data(self, client=None): """ Load probe data from the appropriate backend (probed.xml or the database) """ if self._use_db: - return self._load_data_db() + return self._load_data_db(client=client) else: + # the XML backend doesn't support loading data for single + # clients, so it reloads all data return self._load_data_xml() def _load_data_xml(self): @@ -298,20 +305,36 @@ class Probes(Bcfg2.Server.Plugin.Probing, elif pdata.tag == 'Group': self.cgroups[client.get('name')].append(pdata.get('name')) - def _load_data_db(self): + if self.core.metadata_cache_mode in ['cautious', 'aggressive']: + self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) + + def _load_data_db(self, client=None): """ Load probe data from the database """ - self.probedata = {} - self.cgroups = {} - for pdata in ProbesDataModel.objects.all(): + if client is None: + self.probedata = {} + self.cgroups = {} + probedata = ProbesDataModel.objects.all() + groupdata = ProbesGroupsModel.objects.all() + else: + self.probedata.pop(client, None) + self.cgroups.pop(client, None) + probedata = ProbesDataModel.objects.filter(hostname=client) + groupdata = ProbesGroupsModel.objects.filter(hostname=client) + + for pdata in probedata: if pdata.hostname not in self.probedata: self.probedata[pdata.hostname] = ClientProbeDataSet( timestamp=time.mktime(pdata.timestamp.timetuple())) self.probedata[pdata.hostname][pdata.probe] = ProbeData(pdata.data) - for pgroup in ProbesGroupsModel.objects.all(): + for pgroup in groupdata: if pgroup.hostname not in self.cgroups: self.cgroups[pgroup.hostname] = [] self.cgroups[pgroup.hostname].append(pgroup.group) + if self.core.metadata_cache_mode in ['cautious', 'aggressive']: + self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata, + key=client) + @track_statistics() def GetProbes(self, meta): return self.probes.get_probe_data(meta) diff --git a/src/lib/Bcfg2/Server/Plugins/PuppetENC.py b/src/lib/Bcfg2/Server/Plugins/PuppetENC.py index 3b367573b..a02f012a0 100644 --- a/src/lib/Bcfg2/Server/Plugins/PuppetENC.py +++ b/src/lib/Bcfg2/Server/Plugins/PuppetENC.py @@ -117,7 +117,7 @@ class PuppetENC(Bcfg2.Server.Plugin.Plugin, self.logger.warning("PuppetENC is incompatible with aggressive " "client metadata caching, try 'cautious' or " "'initial' instead") - self.core.cache.expire() + self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) def end_statistics(self, metadata): self.end_client_run(self, metadata) diff --git a/src/lib/Bcfg2/Server/Plugins/SSHbase.py b/src/lib/Bcfg2/Server/Plugins/SSHbase.py index 84dcf2780..f350a7761 100644 --- a/src/lib/Bcfg2/Server/Plugins/SSHbase.py +++ b/src/lib/Bcfg2/Server/Plugins/SSHbase.py @@ -90,6 +90,7 @@ class KnownHostsEntrySet(Bcfg2.Server.Plugin.EntrySet): class SSHbase(Bcfg2.Server.Plugin.Plugin, + Bcfg2.Server.Plugin.Caching, Bcfg2.Server.Plugin.Generator, Bcfg2.Server.Plugin.PullTarget): """ @@ -123,6 +124,7 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin, def __init__(self, core, datastore): Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore) + Bcfg2.Server.Plugin.Caching.__init__(self) Bcfg2.Server.Plugin.Generator.__init__(self) Bcfg2.Server.Plugin.PullTarget.__init__(self) self.ipcache = {} @@ -147,9 +149,11 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin, self.entries["/etc/ssh/" + keypattern] = \ HostKeyEntrySet(keypattern, self.data) self.Entries['Path']["/etc/ssh/" + keypattern] = self.build_hk - self.cmd = Executor() + def expire_cache(self, key=None): + self.__skn = False + def get_skn(self): """Build memory cache of the ssh known hosts file.""" if not self.__skn: diff --git a/src/lib/Bcfg2/Utils.py b/src/lib/Bcfg2/Utils.py index 59065479a..993fd9e0f 100644 --- a/src/lib/Bcfg2/Utils.py +++ b/src/lib/Bcfg2/Utils.py @@ -235,9 +235,9 @@ class Executor(object): # py3k fixes if not isinstance(stdout, str): - stdout = stdout.decode('utf-8') + stdout = stdout.decode('utf-8') # pylint: disable=E1103 if not isinstance(stderr, str): - stderr = stderr.decode('utf-8') + stderr = stderr.decode('utf-8') # pylint: disable=E1103 for line in stdout.splitlines(): # pylint: disable=E1103 self.logger.debug('< %s' % line) diff --git a/src/sbin/bcfg2-crypt b/src/sbin/bcfg2-crypt index 6a92a0260..9190f1390 100755 --- a/src/sbin/bcfg2-crypt +++ b/src/sbin/bcfg2-crypt @@ -160,6 +160,7 @@ class CfgDecryptor(Decryptor): except Bcfg2.Server.Encryption.EVPError: self.logger.info("Could not decrypt %s with any passphrase" % self.filename) + return False def get_destination_filename(self, original_filename): if original_filename.endswith(".crypt"): @@ -419,7 +420,7 @@ def main(): # pylint: disable=R0912,R0915 if data is None: data = getattr(tool, mode)() - if data is False: + if not data: logger.error("Failed to %s %s, skipping" % (mode, fname)) continue if setup['crypt_stdout']: diff --git a/src/sbin/bcfg2-info b/src/sbin/bcfg2-info index 787ed1d49..9e3a671da 100755 --- a/src/sbin/bcfg2-info +++ b/src/sbin/bcfg2-info @@ -471,9 +471,10 @@ Bcfg2 client itself.""") alist = args.split() if len(alist): for client in self._get_client_list(alist): - self.metadata_cache.expire(client) + self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata, + key=client) else: - self.metadata_cache.expire() + self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) def do_probes(self, args): """probes [-p] <hostname> diff --git a/src/sbin/bcfg2-yum-helper b/src/sbin/bcfg2-yum-helper index 161aa3e50..49baeb9c3 100755 --- a/src/sbin/bcfg2-yum-helper +++ b/src/sbin/bcfg2-yum-helper @@ -255,9 +255,15 @@ class CacheManager(YumHelper): for repo in self.yumbase.repos.listEnabled(): # this populates the cache as a side effect repo.repoXML # pylint: disable=W0104 + try: + repo.getGroups() + except yum.Errors.RepoMDError: + pass # this repo has no groups self.yumbase.repos.populateSack(mdtype='metadata', cacheonly=1) self.yumbase.repos.populateSack(mdtype='filelists', cacheonly=1) self.yumbase.repos.populateSack(mdtype='otherdata', cacheonly=1) + # this does something with the groups cache as a side effect + self.yumbase.comps # pylint: disable=W0104 def main(): @@ -304,11 +310,11 @@ def main(): try: # this code copied from yumcommands.py cachemgr.populate_cache() - print json.dumps(True) + print(json.dumps(True)) except yum.Errors.YumBaseError: logger.error("Unexpected error creating cache: %s" % sys.exc_info()[1], exc_info=1) - print json.dumps(False) + print(json.dumps(False)) elif cmd == "complete": depsolver = DepSolver(options.config, options.verbose) try: diff --git a/testsuite/Testschema/test_schema.py b/testsuite/Testschema/test_schema.py index ddfe4775f..cd9b74cdf 100644 --- a/testsuite/Testschema/test_schema.py +++ b/testsuite/Testschema/test_schema.py @@ -41,7 +41,7 @@ class TestSchemas(Bcfg2TestCase): xmllint = Popen(['xmllint', '--xinclude', '--noout', '--schema', self.schema_url] + schemas, stdout=PIPE, stderr=STDOUT) - print(xmllint.communicate()[0]) + print(xmllint.communicate()[0].decode()) self.assertEqual(xmllint.wait(), 0) def test_duplicates(self): diff --git a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py index e0406fd92..8e7b58d30 100644 --- a/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py +++ b/testsuite/Testsrc/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py @@ -1009,7 +1009,7 @@ class TestPOSIXTool(TestTool): else: return True ptool._set_perms.side_effect = set_perms_rv - self.assertFalse(ptool._makedirs(entry)) + self.assertTrue(ptool._makedirs(entry)) self.assertItemsEqual(mock_exists.call_args_list, [call("/test"), call("/test/foo"), call("/test/foo/bar")]) diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testbase.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testbase.py index e26c26d41..870983f60 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testbase.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testbase.py @@ -29,16 +29,11 @@ class TestDebuggable(Bcfg2TestCase): def test_set_debug(self): d = self.get_obj() - d.debug_log = Mock() self.assertEqual(True, d.set_debug(True)) self.assertEqual(d.debug_flag, True) - self.assertTrue(d.debug_log.called) - - d.debug_log.reset_mock() self.assertEqual(False, d.set_debug(False)) self.assertEqual(d.debug_flag, False) - self.assertTrue(d.debug_log.called) def test_toggle_debug(self): d = self.get_obj() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py index 13c27c149..90f592eb2 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -895,10 +895,13 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked): metadata = self.load_clients_data(metadata=self.load_groups_data()) if not metadata._use_db: metadata.clients_xml.write = Mock() + metadata.core.build_metadata = Mock() + metadata.core.build_metadata.side_effect = \ + lambda c: metadata.get_initial_metadata(c) + metadata.set_profile("client1", "group2", None) mock_update_client.assert_called_with("client1", dict(profile="group2")) - metadata.clients_xml.write.assert_any_call() self.assertEqual(metadata.clientgroups["client1"], ["group2"]) metadata.clients_xml.write.reset_mock() diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py index 30b08ef2f..f44bc338c 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py @@ -461,7 +461,7 @@ text def test_load_data_db(self): probes = self.get_probes_object(use_db=True) probes.load_data() - probes._load_data_db.assert_any_call() + probes._load_data_db.assert_any_call(client=None) self.assertFalse(probes._load_data_xml.called) @patch("lxml.etree.parse") diff --git a/testsuite/Testsrc/test_code_checks.py b/testsuite/Testsrc/test_code_checks.py index 415b316fd..87b63a6ab 100644 --- a/testsuite/Testsrc/test_code_checks.py +++ b/testsuite/Testsrc/test_code_checks.py @@ -70,7 +70,9 @@ no_checks = { "lib/Bcfg2/Server/Reports": ["manage.py"], "lib/Bcfg2/Server/Plugins": ["Base.py"], } - +if sys.version_info > (2, 5): + # multiprocessing core requires py2.6 + no_checks['lib/Bcfg2/Server'].append('MultiprocessingCore.py') try: any @@ -177,7 +179,7 @@ class CodeTestCase(Bcfg2TestCase): cmd = self.command + self.full_args + extra_args + \ [os.path.join(srcpath, f) for f in files] proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, env=self.get_env()) - print(proc.communicate()[0]) + print(proc.communicate()[0].decode()) self.assertEqual(proc.wait(), 0) def _test_errors(self, files, extra_args=None): @@ -189,7 +191,7 @@ class CodeTestCase(Bcfg2TestCase): cmd = self.command + self.error_args + extra_args + \ [os.path.join(srcpath, f) for f in files] proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, env=self.get_env()) - print(proc.communicate()[0]) + print(proc.communicate()[0].decode()) self.assertEqual(proc.wait(), 0) @skipIf(not os.path.exists(srcpath), "%s does not exist" % srcpath) @@ -312,7 +314,7 @@ class TestPylint(CodeTestCase): args = self.command + self.error_args + extra_args + \ [os.path.join(srcpath, p) for p in files] pylint = Popen(args, stdout=PIPE, stderr=STDOUT, env=self.get_env()) - output = pylint.communicate()[0] + output = pylint.communicate()[0].decode() rv = pylint.wait() for line in output.splitlines(): diff --git a/testsuite/requirements.txt b/testsuite/requirements.txt index c76466cfe..4733d045c 100644 --- a/testsuite/requirements.txt +++ b/testsuite/requirements.txt @@ -2,7 +2,7 @@ lxml nose mock sphinx -pylint +pylint<1.0 pep8 python-daemon genshi |