diff options
author | Narayan Desai <desai@mcs.anl.gov> | 2009-05-15 03:57:33 +0000 |
---|---|---|
committer | Narayan Desai <desai@mcs.anl.gov> | 2009-05-15 03:57:33 +0000 |
commit | e226690c555e87ac13f5d2543d197864e4d0270c (patch) | |
tree | 18f2215e2206fb7afe986f8b7660cb0e2c997ada /src/lib | |
parent | 60a2643d4e7bd0a0964180ebbfd8e31d12cbbf98 (diff) | |
download | bcfg2-e226690c555e87ac13f5d2543d197864e4d0270c.tar.gz bcfg2-e226690c555e87ac13f5d2543d197864e4d0270c.tar.bz2 bcfg2-e226690c555e87ac13f5d2543d197864e4d0270c.zip |
Merge xmlrpc interfaces with Bcfg2.Server.Core
git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@5238 ce84e21b-d406-0410-9b95-82705330c041
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/Server/Core.py | 170 | ||||
-rw-r--r-- | src/lib/Server/XMLRPC.py | 160 |
2 files changed, 151 insertions, 179 deletions
diff --git a/src/lib/Server/Core.py b/src/lib/Server/Core.py index 636542bf2..f21250d81 100644 --- a/src/lib/Server/Core.py +++ b/src/lib/Server/Core.py @@ -1,15 +1,26 @@ '''Bcfg2.Server.Core provides the runtime support for bcfg2 modules''' __revision__ = '$Revision$' -from time import time +import copy +import hashlib +import logging +import lxml.etree +import select +import threading +import time +import xmlrpclib +from Bcfg2.Component import Component, exposed from Bcfg2.Server.Plugin import PluginInitError, PluginExecutionError import Bcfg2.Server.FileMonitor - -import copy, logging, lxml.etree import Bcfg2.Server.Plugins.Metadata -logger = logging.getLogger('Bcfg2.Core') +logger = logging.getLogger('Bcfg2.Server.Core') + +def critical_error(operation): + '''Log and err, traceback and return an xmlrpc fault to client''' + logger.error(operation, exc_info=1) + raise xmlrpclib.Fault(7, "Critical unexpected failure: %s" % (operation)) try: import psyco @@ -21,12 +32,14 @@ class CoreInitError(Exception): '''This error is raised when the core cannot be initialized''' pass -class Core(object): +class Core(Component): '''The Core object is the container for all Bcfg2 Server logic, and modules''' + name = 'bcfg2' + implementation = 'bcfg2' - def __init__(self, repo, plugins, password, encoding, - filemonitor='default'): - object.__init__(self) + def __init__(self, repo, plugins, password, encoding, ca=None, + filemonitor='default', start_fam_thread=False): + Component.__init__(self) self.datastore = repo if filemonitor not in Bcfg2.Server.FileMonitor.available: logger.error("File monitor driver %s not available; forcing to default" % filemonitor) @@ -78,7 +91,24 @@ class Core(object): isinstance(plugin, Bcfg2.Server.Plugin.Structure)] self.connectors = [plugin for plugin in self.plugins.values() if \ isinstance(plugin, Bcfg2.Server.Plugin.Connector)] - + self.ca = ca + self.fam_thread = threading.Thread(target=self._file_monitor_thread) + if start_fam_thread: + self.fam_thread.start() + + def _file_monitor_thread(self): + famfd = self.fam.fileno() + while True: + try: + if famfd: + select.select([famfd], [], []) + else: + while not self.fam.pending(): + time.sleep(15) + self.fam.handle_event_set(self.lock) + except: + continue + def init_plugins(self, plugin): try: mod = getattr(__import__("Bcfg2.Server.Plugins.%s" % @@ -181,7 +211,7 @@ class Core(object): def BuildConfiguration(self, client): '''Build Configuration for client''' - start = time() + start = time.time() config = lxml.etree.Element("Configuration", version='2.0', \ revision=self.revision) try: @@ -219,17 +249,9 @@ class Core(object): logger.error("error in BindStructure", exc_info=1) self.validate_data(meta, config, Bcfg2.Server.Plugin.GoalValidator) logger.info("Generated config for %s in %.03fs" % \ - (client, time() - start)) + (client, time.time() - start)) return config - def Service(self): - '''Perform periodic update tasks''' - count = self.fam.Service() - if count: - for plugin in self.plugins.values(): - if isinstance(plugin, Bcfg2.Server.Plugin.Version): - self.revision = plugin.get_revision() - def GetDecisions(self, metadata, mode): result = [] for plugin in self.plugins.values(): @@ -267,3 +289,113 @@ class Core(object): logger.info("Client %s reported state %s" % (client_name, state.get('state'))) + # XMLRPC handlers start here + + @exposed + def GetProbes(self, address): + '''Fetch probes for a particular client''' + resp = lxml.etree.Element('probes') + try: + name = self.metadata.resolve_client(address) + meta = self.build_metadata(name) + + for plugin in [p for p in list(self.plugins.values()) \ + if isinstance(p, Bcfg2.Server.Plugin.Probing)]: + for probe in plugin.GetProbes(meta): + resp.append(probe) + return lxml.etree.tostring(resp, encoding='UTF-8', + xml_declaration=True) + except Bcfg2.Server.Plugins.Metadata.MetadataConsistencyError: + warning = 'Client metadata resolution error for %s; check server log' % address[0] + self.logger.warning(warning) + raise xmlrpclib.Fault(6, warning) + except: + critical_error("error determining client probes") + + @exposed + def RecvProbeData(self, address, probedata): + '''Receive probe data from clients''' + try: + name = self.metadata.resolve_client(address) + meta = self.build_metadata(name) + except Bcfg2.Server.Plugins.Metadata.MetadataConsistencyError: + warning = 'metadata consistency error' + self.logger.warning(warning) + raise xmlrpclib.Fault(6, warning) + # clear dynamic groups + self.metadata.cgroups[meta.hostname] = [] + try: + xpdata = lxml.etree.XML(probedata) + except: + self.logger.error("Failed to parse probe data from client %s" % \ + (address[0])) + return False + + sources = [] + [sources.append(data.get('source')) for data in xpdata + if data.get('source') not in sources] + for source in sources: + if source not in self.plugins: + self.logger.warning("Failed to locate plugin %s" % (source)) + continue + dl = [data for data in xpdata if data.get('source') == source] + try: + self.plugins[source].ReceiveData(meta, dl) + except: + logger.error("Failed to process probe data from client %s" % \ + (address[0]), exc_info=1) + return True + + @exposed + def AssertProfile(self, address, profile): + '''Set profile for a client''' + try: + client = self.metadata.resolve_client(address) + self.metadata.set_profile(client, profile, address) + except (Bcfg2.Server.Plugins.Metadata.MetadataConsistencyError, + Bcfg2.Server.Plugins.Metadata.MetadataRuntimeError): + warning = 'metadata consistency error' + self.logger.warning(warning) + raise xmlrpclib.Fault(6, warning) + return True + + @exposed + def GetConfig(self, address, checksum=False): + '''Build config for a client''' + try: + client = self.metadata.resolve_client(address) + config = self.BuildConfiguration(client) + if checksum: + for cfile in config.findall('.//ConfigFile'): + if cfile.text != None: + csum = hashlib.md5() + csum.update(cfile.text) + cfile.set('checksum', csum.hexdigest()) + cfile.text = None + return lxml.etree.tostring(config, encoding='UTF-8', + xml_declaration=True) + except Bcfg2.Server.Plugins.Metadata.MetadataConsistencyError: + self.logger.warning("Metadata consistency failure for %s" % (address)) + raise xmlrpclib.Fault(6, "Metadata consistency failure") + + @exposed + def RecvStats(self, address, stats): + '''Act on statistics upload''' + sdata = lxml.etree.XML(stats) + client = self.metadata.resolve_client(address) + self.process_statistics(client, sdata) + return "<ok/>" + + def authenticate(self, cert, user, password, address): + if self.ca: + acert = cert + else: + # no ca, so no cert validation can be done + acert = None + return self.metadata.AuthenticateConnection(acert, user, password, address) + + @exposed + def GetDecisionList(self, address, mode): + client = self.metadata.resolve_client(address) + meta = self.build_metadata(client) + return self.GetDecisions(meta, mode) diff --git a/src/lib/Server/XMLRPC.py b/src/lib/Server/XMLRPC.py deleted file mode 100644 index 36d7c3b79..000000000 --- a/src/lib/Server/XMLRPC.py +++ /dev/null @@ -1,160 +0,0 @@ - -import hashlib -import logging -import lxml.etree -import select -import socket -import threading -import time -import xmlrpclib - -from Bcfg2.Component import Component, automatic, exposed, locking -import Bcfg2.Server.Core - -logger = logging.getLogger('server') - -def critical_error(operation): - '''Log and err, traceback and return an xmlrpc fault to client''' - logger.error(operation, exc_info=1) - raise xmlrpclib.Fault(7, "Critical unexpected failure: %s" % (operation)) - -class SetupError(Exception): - '''Used when the server cant be setup''' - pass - -class bcfg2_server(Component, - Bcfg2.Server.Core.Core): - '''XML RPC interfaces for the server core''' - name = 'bcfg2-server' - implementation = 'bcfg2-server' - - def __init__(self, setup): - Component.__init__(self) - Bcfg2.Server.Core.Core.__init__(self, setup['repo'], setup['plugins'], - setup['password'], - setup['encoding'], setup['filemonitor']) - self.ca = setup['ca'] - self.fam_thread = threading.Thread(target=self._file_monitor_thread) - self.fam_thread.start() - - def _file_monitor_thread(self): - famfd = self.fam.fileno() - while True: - try: - if famfd: - rsockinfo = select.select([famfd], [], []) - else: - while not self.fam.pending(): - time.sleep(15) - self.fam.handle_event_set(self.lock) - except: - continue - - @exposed - def GetProbes(self, address): - '''Fetch probes for a particular client''' - resp = lxml.etree.Element('probes') - try: - name = self.metadata.resolve_client(address) - meta = self.build_metadata(name) - - for plugin in [p for p in list(self.plugins.values()) \ - if isinstance(p, Bcfg2.Server.Plugin.Probing)]: - for probe in plugin.GetProbes(meta): - resp.append(probe) - return lxml.etree.tostring(resp, encoding='UTF-8', - xml_declaration=True) - except Bcfg2.Server.Plugins.Metadata.MetadataConsistencyError: - warning = 'Client metadata resolution error for %s; check server log' % address[0] - self.logger.warning(warning) - raise xmlrpclib.Fault(6, warning) - except: - critical_error("error determining client probes") - - @exposed - def RecvProbeData(self, address, probedata): - '''Receive probe data from clients''' - try: - name = self.metadata.resolve_client(address) - meta = self.build_metadata(name) - except Bcfg2.Server.Plugins.Metadata.MetadataConsistencyError: - warning = 'metadata consistency error' - self.logger.warning(warning) - raise xmlrpclib.Fault(6, warning) - # clear dynamic groups - self.metadata.cgroups[meta.hostname] = [] - try: - xpdata = lxml.etree.XML(probedata) - except: - self.logger.error("Failed to parse probe data from client %s" % \ - (address[0])) - return False - - sources = [] - [sources.append(data.get('source')) for data in xpdata - if data.get('source') not in sources] - for source in sources: - if source not in self.plugins: - self.logger.warning("Failed to locate plugin %s" % (source)) - continue - dl = [data for data in xpdata if data.get('source') == source] - try: - self.plugins[source].ReceiveData(meta, dl) - except: - logger.error("Failed to process probe data from client %s" % \ - (address[0]), exc_info=1) - return True - - @exposed - def AssertProfile(self, address, profile): - '''Set profile for a client''' - try: - client = self.metadata.resolve_client(address) - self.metadata.set_profile(client, profile, address) - except (Bcfg2.Server.Plugins.Metadata.MetadataConsistencyError, - Bcfg2.Server.Plugins.Metadata.MetadataRuntimeError): - warning = 'metadata consistency error' - self.logger.warning(warning) - raise xmlrpclib.Fault(6, warning) - return True - - @exposed - def GetConfig(self, address, checksum=False): - '''Build config for a client''' - try: - client = self.metadata.resolve_client(address) - config = self.BuildConfiguration(client) - if checksum: - for cfile in config.findall('.//ConfigFile'): - if cfile.text != None: - csum = hashlib.md5() - csum.update(cfile.text) - cfile.set('checksum', csum.hexdigest()) - cfile.text = None - return lxml.etree.tostring(config, encoding='UTF-8', - xml_declaration=True) - except Bcfg2.Server.Plugins.Metadata.MetadataConsistencyError: - self.logger.warning("Metadata consistency failure for %s" % (address)) - raise xmlrpclib.Fault(6, "Metadata consistency failure") - - @exposed - def RecvStats(self, address, stats): - '''Act on statistics upload''' - sdata = lxml.etree.XML(stats) - client = self.metadata.resolve_client(address) - self.process_statistics(client, sdata) - return "<ok/>" - - def authenticate(self, cert, user, password, address): - if self.ca: - acert = cert - else: - # no ca, so no cert validation can be done - acert = None - return self.metadata.AuthenticateConnection(acert, user, password, address) - - @exposed - def GetDecisionList(self, address, mode): - client = self.metadata.resolve_client(address) - meta = self.build_metadata(client) - return self.GetDecisions(meta, mode) |