diff options
Diffstat (limited to 'src/lib/Bcfg2/Reporting')
-rw-r--r-- | src/lib/Bcfg2/Reporting/Collector.py | 52 | ||||
-rwxr-xr-x | src/lib/Bcfg2/Reporting/Reports.py | 276 | ||||
-rw-r--r-- | src/lib/Bcfg2/Reporting/Storage/DjangoORM.py | 39 | ||||
-rw-r--r-- | src/lib/Bcfg2/Reporting/Storage/__init__.py | 29 | ||||
-rw-r--r-- | src/lib/Bcfg2/Reporting/Storage/base.py | 14 | ||||
-rw-r--r-- | src/lib/Bcfg2/Reporting/Transport/DirectStore.py | 17 | ||||
-rw-r--r-- | src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py | 30 | ||||
-rw-r--r-- | src/lib/Bcfg2/Reporting/Transport/RedisTransport.py | 55 | ||||
-rw-r--r-- | src/lib/Bcfg2/Reporting/Transport/__init__.py | 32 | ||||
-rw-r--r-- | src/lib/Bcfg2/Reporting/Transport/base.py | 16 | ||||
-rw-r--r-- | src/lib/Bcfg2/Reporting/migrations/0002_convert_perms_to_mode.py | 3 | ||||
-rw-r--r-- | src/lib/Bcfg2/Reporting/models.py | 12 | ||||
-rw-r--r-- | src/lib/Bcfg2/Reporting/views.py | 2 |
13 files changed, 380 insertions, 197 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index 8ca145f16..12c9cdaa8 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -3,7 +3,6 @@ import atexit import daemon import logging import time -import traceback import threading # pylint: disable=E0611 @@ -16,11 +15,11 @@ except ImportError: # pylint: enable=E0611 import Bcfg2.Logger -from Bcfg2.Reporting.Transport import load_transport_from_config, \ - TransportError, TransportImportError +import Bcfg2.Options +from Bcfg2.Reporting.Transport.base import TransportError from Bcfg2.Reporting.Transport.DirectStore import DirectStore -from Bcfg2.Reporting.Storage import load_storage_from_config, \ - StorageError, StorageImportError +from Bcfg2.Reporting.Storage.base import StorageError + class ReportingError(Exception): @@ -51,47 +50,43 @@ class ReportingStoreThread(threading.Thread): except: #TODO requeue? self.logger.error("Unhandled exception in import thread %s" % - traceback.format_exc().splitlines()[-1]) + sys.exc_info()[1]) class ReportingCollector(object): """The collecting process for reports""" + options = [Bcfg2.Options.Common.reporting_storage, + Bcfg2.Options.Common.reporting_transport, + Bcfg2.Options.Common.daemon] - def __init__(self, setup): + def __init__(self): """Setup the collector. This may be called by the daemon or though bcfg2-admin""" - self.setup = setup - self.datastore = setup['repo'] - self.encoding = setup['encoding'] self.terminate = None self.context = None self.children = [] self.cleanup_threshold = 25 - if setup['debug']: + if Bcfg2.Options.setup.debug: level = logging.DEBUG - elif setup['verbose']: + elif Bcfg2.Options.setup.verbose: level = logging.INFO else: level = logging.WARNING - Bcfg2.Logger.setup_logging('bcfg2-report-collector', - to_console=logging.INFO, - to_syslog=setup['syslog'], - to_file=setup['logging'], - level=level) + Bcfg2.Logger.setup_logging() self.logger = logging.getLogger('bcfg2-report-collector') try: - self.transport = load_transport_from_config(setup) - self.storage = load_storage_from_config(setup) + self.transport = Bcfg2.Options.setup.reporting_transport() + self.storage = Bcfg2.Options.setup.reporting_storage() except TransportError: self.logger.error("Failed to load transport: %s" % - traceback.format_exc().splitlines()[-1]) + sys.exc_info()[1]) raise ReportingError except StorageError: self.logger.error("Failed to load storage: %s" % - traceback.format_exc().splitlines()[-1]) + sys.exc_info()[1]) raise ReportingError if isinstance(self.transport, DirectStore): @@ -102,12 +97,12 @@ class ReportingCollector(object): try: self.logger.debug("Validating storage %s" % - self.storage.__class__.__name__) + self.storage.__class__.__name__) self.storage.validate() except: self.logger.error("Storage backed %s failed to validate: %s" % - (self.storage.__class__.__name__, - traceback.format_exc().splitlines()[-1])) + (self.storage.__class__.__name__, + sys.exc_info()[1])) def run(self): """Startup the processing and go!""" @@ -116,10 +111,10 @@ class ReportingCollector(object): self.context = daemon.DaemonContext(detach_process=True) iter = 0 - if self.setup['daemon']: + if Bcfg2.Options.setup.daemon: self.logger.debug("Daemonizing") try: - self.context.pidfile = PIDLockFile(self.setup['daemon']) + self.context.pidfile = PIDLockFile(Bcfg2.Options.setup.daemon) self.context.open() except LockFailed: self.logger.error("Failed to daemonize: %s" % @@ -134,7 +129,7 @@ class ReportingCollector(object): return except PIDFileError: self.logger.error("Error writing pid file: %s" % - traceback.format_exc().splitlines()[-1]) + sys.exc_info()[1]) self.shutdown() return self.logger.info("Starting daemon") @@ -146,7 +141,6 @@ class ReportingCollector(object): interaction = self.transport.fetch() if not interaction: continue - store_thread = ReportingStoreThread(interaction, self.storage) store_thread.start() self.children.append(store_thread) @@ -161,7 +155,7 @@ class ReportingCollector(object): self.shutdown() except: self.logger.error("Unhandled exception in main loop %s" % - traceback.format_exc().splitlines()[-1]) + sys.exc_info()[1]) def shutdown(self): """Cleanup and go""" diff --git a/src/lib/Bcfg2/Reporting/Reports.py b/src/lib/Bcfg2/Reporting/Reports.py new file mode 100755 index 000000000..35c09a7e1 --- /dev/null +++ b/src/lib/Bcfg2/Reporting/Reports.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python +"""Query reporting system for client status.""" + +import sys +import argparse +import datetime +import Bcfg2.DBSettings + + +def hosts_by_entry_type(clients, etype, entryspec): + result = [] + for entry in entryspec: + for client in clients: + items = getattr(client.current_interaction, etype)() + for item in items: + if (item.entry_type == entry[0] and + item.name == entry[1]): + result.append(client) + return result + + +def print_fields(fields, client, fmt, extra=None): + """ Prints the fields specified in fields of client, max_name + specifies the column width of the name column. """ + fdata = [] + if extra is None: + extra = dict() + for field in fields: + if field == 'time': + fdata.append(str(client.current_interaction.timestamp)) + elif field == 'state': + if client.current_interaction.isclean(): + fdata.append("clean") + else: + fdata.append("dirty") + elif field == 'total': + fdata.append(client.current_interaction.total_count) + elif field == 'good': + fdata.append(client.current_interaction.good_count) + elif field == 'modified': + fdata.append(client.current_interaction.modified_count) + elif field == 'extra': + fdata.append(client.current_interaction.extra_count) + elif field == 'bad': + fdata.append((client.current_interaction.bad_count)) + else: + try: + fdata.append(getattr(client, field)) + except AttributeError: + fdata.append(extra.get(field, "N/A")) + + print(fmt % tuple(fdata)) + + +def print_entries(interaction, etype): + items = getattr(interaction, etype)() + for item in items: + print("%-70s %s" % (item.entry_type + ":" + item.name, etype)) + + +class _SingleHostCmd(Bcfg2.Options.Subcommand): # pylint: disable=W0223 + """ Base class for bcfg2-reports modes that take a single host as + a positional argument """ + options = [Bcfg2.Options.PositionalArgument("host")] + + def get_client(self, setup): + from Bcfg2.Reporting.models import Client + try: + return Client.objects.select_related().get(name=setup.host) + except Client.DoesNotExist: + print("No such host: %s" % setup.host) + raise SystemExit(2) + + +class Show(_SingleHostCmd): + """ Show bad, extra, modified, or all entries from a given host """ + + options = _SingleHostCmd.options + [ + Bcfg2.Options.BooleanOption( + "-b", "--bad", help="Show bad entries from HOST"), + Bcfg2.Options.BooleanOption( + "-e", "--extra", help="Show extra entries from HOST"), + Bcfg2.Options.BooleanOption( + "-m", "--modified", help="Show modified entries from HOST")] + + def run(self, setup): + client = self.get_client(setup) + show_all = not setup.bad and not setup.extra and not setup.modified + if setup.bad or show_all: + print_entries(client.current_interaction, "bad") + if setup.modified or show_all: + print_entries(client.current_interaction, "modified") + if setup.extra or show_all: + print_entries(client.current_interaction, "extra") + + +class Total(_SingleHostCmd): + """ Show total number of managed and good entries from HOST """ + + def run(self, setup): + client = self.get_client(setup) + managed = client.current_interaction.total_count + good = client.current_interaction.good_count + print("Total managed entries: %d (good: %d)" % (managed, good)) + + +class Expire(_SingleHostCmd): + """ Toggle the expired/unexpired state of HOST """ + + def run(self, setup): + client = self.get_client(setup) + if client.expiration is None: + client.expiration = datetime.datetime.now() + print("%s expired." % client.name) + else: + client.expiration = None + print("%s un-expired." % client.name) + client.save() + + +class _ClientSelectCmd(Bcfg2.Options.Subcommand): + """ Base class for subcommands that display lists of clients """ + options = [ + Bcfg2.Options.Option("--fields", metavar="FIELD,FIELD,...", + help="Only display the listed fields", + type=Bcfg2.Options.Types.comma_list, + default=['name', 'time', 'state'])] + + def get_clients(self): + from Bcfg2.Reporting.models import Client + return Client.objects.exclude(current_interaction__isnull=True) + + def display(self, result, fields, extra=None): + if 'name' not in fields: + fields.insert(0, "name") + if not result: + print("No match found") + return + if extra is None: + extra = dict() + max_name = max(len(c.name) for c in result) + ffmt = [] + for field in fields: + if field == "name": + ffmt.append("%%-%ds" % max_name) + elif field == "time": + ffmt.append("%-19s") + else: + ffmt.append("%%-%ds" % len(field)) + fmt = " ".join(ffmt) + print(fmt % tuple(f.title() for f in fields)) + for client in result: + if not client.expiration: + print_fields(fields, client, fmt, + extra=extra.get(client, None)) + + +class Clients(_ClientSelectCmd): + """ Query hosts """ + options = _ClientSelectCmd.options + [ + Bcfg2.Options.BooleanOption( + "-c", "--clean", help="Show only clean hosts"), + Bcfg2.Options.BooleanOption( + "-d", "--dirty", help="Show only dirty hosts"), + Bcfg2.Options.BooleanOption( + "--stale", + help="Show hosts that haven't run in the last 24 hours")] + + def run(self, setup): + result = [] + show_all = not setup.stale and not setup.clean and not setup.dirty + for client in self.get_clients(): + interaction = client.current_interaction + if (show_all or + (setup.stale and interaction.isstale()) or + (setup.clean and interaction.isclean()) or + (setup.dirty and not interaction.isclean())): + result.append(client) + + self.display(result, setup.fields) + + +class Entries(_ClientSelectCmd): + """ Query hosts by entries """ + options = _ClientSelectCmd.options + [ + Bcfg2.Options.BooleanOption( + "--badentry", + help="Show hosts that have bad entries that match"), + Bcfg2.Options.BooleanOption( + "--modifiedentry", + help="Show hosts that have modified entries that match"), + Bcfg2.Options.BooleanOption( + "--extraentry", + help="Show hosts that have extra entries that match"), + Bcfg2.Options.PathOption( + "--file", type=argparse.FileType('r'), + help="Read TYPE:NAME pairs from the specified file instead of " + "from the command line"), + Bcfg2.Options.PositionalArgument( + "entries", metavar="TYPE:NAME", nargs="*")] + + def run(self, setup): + result = [] + if setup.file: + try: + entries = [l.strip().split(":") for l in setup.file] + except IOError: + err = sys.exc_info()[1] + print("Cannot read entries from %s: %s" % (setup.file.name, + err)) + return 2 + else: + entries = [a.split(":") for a in setup.entries] + + clients = self.get_clients() + if setup.badentry: + result = hosts_by_entry_type(clients, "bad", entries) + elif setup.modifiedentry: + result = hosts_by_entry_type(clients, "modified", entries) + elif setup.extraentry: + result = hosts_by_entry_type(clients, "extra", entries) + + self.display(result, setup.fields) + + +class Entry(_ClientSelectCmd): + """ Show the status of a single entry on all hosts """ + + options = _ClientSelectCmd.options + [ + Bcfg2.Options.PositionalArgument( + "entry", metavar="TYPE:NAME", nargs=1)] + + def run(self, setup): + from Bcfg2.Reporting.models import BaseEntry + result = [] + fields = setup.fields + if 'state' in fields: + fields.remove('state') + fields.append("entry state") + + etype, ename = setup.entry[0].split(":") + try: + entry_cls = BaseEntry.entry_from_type(etype) + except ValueError: + print("Unhandled/unknown type %s" % etype) + return 2 + + # TODO: batch fetch this. sqlite could break + extra = dict() + for client in self.get_clients(): + ents = entry_cls.objects.filter( + name=ename, + interaction=client.current_interaction) + if len(ents) == 0: + continue + extra[client] = {"entry state": ents[0].get_state_display(), + "reason": ents[0]} + result.append(client) + + self.display(result, fields, extra=extra) + + +class CLI(Bcfg2.Options.CommandRegistry): + """ CLI class for bcfg2-reports """ + + def __init__(self): + Bcfg2.Options.CommandRegistry.__init__(self) + Bcfg2.Options.register_commands(self.__class__, globals().values()) + parser = Bcfg2.Options.get_parser( + description="Query the Bcfg2 reporting subsystem", + components=[self]) + parser.parse() + + def run(self): + """ Run bcfg2-reports """ + return self.runcommand() diff --git a/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py b/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py index 2530d2b2b..687b4b571 100644 --- a/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py +++ b/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py @@ -2,15 +2,11 @@ The base for the original DjangoORM (DBStats) """ -import os -import traceback from lxml import etree from datetime import datetime from time import strptime - -os.environ['DJANGO_SETTINGS_MODULE'] = 'Bcfg2.settings' -from Bcfg2 import settings - +import Bcfg2.Options +import Bcfg2.DBSettings from Bcfg2.Compat import md5 from Bcfg2.Reporting.Storage.base import StorageBase, StorageError from Bcfg2.Server.Plugin.exceptions import PluginExecutionError @@ -28,9 +24,13 @@ from Bcfg2.Reporting.Compat import transaction class DjangoORM(StorageBase): - def __init__(self, setup): - super(DjangoORM, self).__init__(setup) - self.size_limit = setup.get('reporting_file_limit') + options = StorageBase.options + [ + Bcfg2.Options.Common.repository, + Bcfg2.Options.Option( + cf=('reporting', 'file_limit'), + type=Bcfg2.Options.Types.size, + help='Reporting file size limit', + default=1024 * 1024)] def _import_default(self, entry, state, entrytype=None, defaults=None, mapping=None, boolean=None, xforms=None): @@ -185,7 +185,7 @@ class DjangoORM(StorageBase): act_dict['detail_type'] = PathEntry.DETAIL_DIFF cdata = entry.get('current_bdiff') if cdata: - if len(cdata) > self.size_limit: + if len(cdata) > Bcfg2.Options.setup.file_limit: act_dict['detail_type'] = PathEntry.DETAIL_SIZE_LIMIT act_dict['details'] = md5(cdata).hexdigest() else: @@ -365,7 +365,6 @@ class DjangoORM(StorageBase): def import_interaction(self, interaction): """Import the data into the backend""" - try: self._import_interaction(interaction) except: @@ -379,23 +378,21 @@ class DjangoORM(StorageBase): def validate(self): """Validate backend storage. Should be called once when loaded""" - - settings.read_config(repo=self.setup['repo']) - # verify our database schema try: - if self.setup['debug']: + if Bcfg2.Options.setup.debug: vrb = 2 - elif self.setup['verbose']: + elif Bcfg2.Options.setup.verbose: vrb = 1 else: vrb = 0 - management.call_command("syncdb", verbosity=vrb, interactive=False) - management.call_command("migrate", verbosity=vrb, interactive=False) + Bcfg2.DBSettings.sync_databases(verbosity=vrb, interactive=False) + Bcfg2.DBSettings.migrate_databases(verbosity=vrb, + interactive=False) except: - self.logger.error("Failed to update database schema: %s" % \ - traceback.format_exc().splitlines()[-1]) - raise StorageError + msg = "Failed to update database schema: %s" % sys.exc_info()[1] + self.logger.error(msg) + raise StorageError(msg) def GetExtra(self, client): """Fetch extra entries for a client""" diff --git a/src/lib/Bcfg2/Reporting/Storage/__init__.py b/src/lib/Bcfg2/Reporting/Storage/__init__.py index 85356fcfe..953104d4b 100644 --- a/src/lib/Bcfg2/Reporting/Storage/__init__.py +++ b/src/lib/Bcfg2/Reporting/Storage/__init__.py @@ -1,32 +1,3 @@ """ Public storage routines """ - -import traceback - -from Bcfg2.Reporting.Storage.base import StorageError, \ - StorageImportError - -def load_storage(storage_name, setup): - """ - Try to load the storage. Raise StorageImportError on failure - """ - try: - mod_name = "%s.%s" % (__name__, storage_name) - mod = getattr(__import__(mod_name).Reporting.Storage, storage_name) - except ImportError: - try: - mod = __import__(storage_name) - except: - raise StorageImportError("Unavailable") - try: - cls = getattr(mod, storage_name) - return cls(setup) - except: - raise StorageImportError("Storage unavailable: %s" % - traceback.format_exc().splitlines()[-1]) - -def load_storage_from_config(setup): - """Load the storage in the config... eventually""" - return load_storage('DjangoORM', setup) - diff --git a/src/lib/Bcfg2/Reporting/Storage/base.py b/src/lib/Bcfg2/Reporting/Storage/base.py index 92cc3a68b..771f755a1 100644 --- a/src/lib/Bcfg2/Reporting/Storage/base.py +++ b/src/lib/Bcfg2/Reporting/Storage/base.py @@ -2,28 +2,25 @@ The base for all Storage backends """ -import logging +import logging + class StorageError(Exception): """Generic StorageError""" pass -class StorageImportError(StorageError): - """Raised when a storage module fails to import""" - pass - class StorageBase(object): """The base for all storages""" + options = [] + __rmi__ = ['Ping', 'GetExtra', 'GetCurrentEntry'] - def __init__(self, setup): + def __init__(self): """Do something here""" clsname = self.__class__.__name__ self.logger = logging.getLogger(clsname) self.logger.debug("Loading %s storage" % clsname) - self.setup = setup - self.encoding = setup['encoding'] def import_interaction(self, interaction): """Import the data into the backend""" @@ -48,4 +45,3 @@ class StorageBase(object): def GetCurrentEntry(self, client, e_type, e_name): """Get the current status of an entry on the client""" raise NotImplementedError - diff --git a/src/lib/Bcfg2/Reporting/Transport/DirectStore.py b/src/lib/Bcfg2/Reporting/Transport/DirectStore.py index 79d1b5aba..b9d17212e 100644 --- a/src/lib/Bcfg2/Reporting/Transport/DirectStore.py +++ b/src/lib/Bcfg2/Reporting/Transport/DirectStore.py @@ -5,18 +5,20 @@ import os import sys import time import threading +import Bcfg2.Options from Bcfg2.Reporting.Transport.base import TransportBase, TransportError -from Bcfg2.Reporting.Storage import load_storage_from_config from Bcfg2.Compat import Queue, Full, Empty, cPickle class DirectStore(TransportBase, threading.Thread): - def __init__(self, setup): - TransportBase.__init__(self, setup) + options = TransportBase.options + [Bcfg2.Options.Common.reporting_storage] + + def __init__(self): + TransportBase.__init__(self) threading.Thread.__init__(self) self.save_file = os.path.join(self.data, ".saved") - self.storage = load_storage_from_config(setup) + self.storage = Bcfg2.Options.setup.reporting_storage() self.storage.validate() self.queue = Queue(100000) @@ -30,10 +32,9 @@ class DirectStore(TransportBase, threading.Thread): def store(self, hostname, metadata, stats): try: - self.queue.put_nowait(dict( - hostname=hostname, - metadata=metadata, - stats=stats)) + self.queue.put_nowait(dict(hostname=hostname, + metadata=metadata, + stats=stats)) except Full: self.logger.warning("Reporting: Queue is full, " "dropping statistics") diff --git a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py index c7d5c512a..189967cb0 100644 --- a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py +++ b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py @@ -9,6 +9,7 @@ import os import select import time import traceback +import Bcfg2.Options import Bcfg2.Server.FileMonitor from Bcfg2.Reporting.Collector import ReportingCollector, ReportingError from Bcfg2.Reporting.Transport.base import TransportBase, TransportError @@ -16,8 +17,10 @@ from Bcfg2.Compat import cPickle class LocalFilesystem(TransportBase): - def __init__(self, setup): - super(LocalFilesystem, self).__init__(setup) + options = TransportBase.options + [Bcfg2.Options.Common.filemonitor] + + def __init__(self): + super(LocalFilesystem, self).__init__() self.work_path = "%s/work" % self.data self.debug_log("LocalFilesystem: work path %s" % self.work_path) @@ -42,24 +45,16 @@ class LocalFilesystem(TransportBase): def start_monitor(self, collector): """Start the file monitor. Most of this comes from BaseCore""" - setup = self.setup - try: - fmon = Bcfg2.Server.FileMonitor.available[setup['filemonitor']] - except KeyError: - self.logger.error("File monitor driver %s not available; " - "forcing to default" % setup['filemonitor']) - fmon = Bcfg2.Server.FileMonitor.available['default'] - if self.debug_flag: - self.fmon.set_debug(self.debug_flag) try: - self.fmon = fmon(debug=self.debug_flag) - self.logger.info("Using the %s file monitor" % - self.fmon.__class__.__name__) + self.fmon = Bcfg2.Server.FileMonitor.get_fam() except IOError: - msg = "Failed to instantiate file monitor %s" % \ - setup['filemonitor'] + msg = "Failed to instantiate fam driver %s" % \ + Bcfg2.Options.setup.filemonitor self.logger.error(msg, exc_info=1) raise TransportError(msg) + + if self.debug_flag: + self.fmon.set_debug(self.debug_flag) self.fmon.start() self.fmon.AddMonitor(self.work_path, self) @@ -154,7 +149,7 @@ class LocalFilesystem(TransportBase): """ try: if not self._phony_collector: - self._phony_collector = ReportingCollector(self.setup) + self._phony_collector = ReportingCollector() except ReportingError: raise TransportError except: @@ -176,4 +171,3 @@ class LocalFilesystem(TransportBase): self.logger.error("RPC method %s failed: %s" % (method, traceback.format_exc().splitlines()[-1])) raise TransportError - diff --git a/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py b/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py index 22d9af57e..7427c2e1d 100644 --- a/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py +++ b/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py @@ -9,9 +9,9 @@ import signal import platform import traceback import threading +import Bcfg2.Options from Bcfg2.Reporting.Transport.base import TransportBase, TransportError from Bcfg2.Compat import cPickle -from Bcfg2.Options import Option try: import redis @@ -34,9 +34,19 @@ class RedisTransport(TransportBase): STATS_KEY = 'bcfg2_statistics' COMMAND_KEY = 'bcfg2_command' - def __init__(self, setup): - super(RedisTransport, self).__init__(setup) - self._redis = None + options = TransportBase.options + [ + Bcfg2.Options.Option( + cf=('reporting', 'redis_host'), dest="reporting_redis_host", + default='127.0.0.1', help='Reporting Redis host'), + Bcfg2.Options.Option( + cf=('reporting', 'redis_port'), dest="reporting_redis_port", + default=6379, type=int, help='Reporting Redis port'), + Bcfg2.Options.Option( + cf=('reporting', 'redis_db'), dest="reporting_redis_db", + default=0, type=int, help='Reporting Redis DB')] + + def __init__(self): + super(RedisTransport, self).__init__() self._commands = None self.logger.error("Warning: RedisTransport is experimental") @@ -45,36 +55,15 @@ class RedisTransport(TransportBase): self.logger.error("redis python module is not available") raise TransportError - setup.update(dict( - reporting_redis_host=Option( - 'Redis Host', - default='127.0.0.1', - cf=('reporting', 'redis_host')), - reporting_redis_port=Option( - 'Redis Port', - default=6379, - cf=('reporting', 'redis_port')), - reporting_redis_db=Option( - 'Redis DB', - default=0, - cf=('reporting', 'redis_db')), - )) - setup.reparse() - - self._redis_host = setup.get('reporting_redis_host', '127.0.0.1') - try: - self._redis_port = int(setup.get('reporting_redis_port', 6379)) - except ValueError: - self.logger.error("Redis port must be an integer") - raise TransportError - self._redis_db = setup.get('reporting_redis_db', 0) - self._redis = redis.Redis(host=self._redis_host, - port=self._redis_port, db=self._redis_db) + self._redis = redis.Redis( + host=Bcfg2.Options.setup.reporting_redis_host, + port=Bcfg2.Options.setup.reporting_redis_port, + db=Bcfg2.Options.setup.reporting_redis_db) def start_monitor(self, collector): """Start the monitor. Eventaully start the command thread""" - self._commands = threading.Thread(target=self.monitor_thread, + self._commands = threading.Thread(target=self.monitor_thread, args=(self._redis, collector)) self._commands.start() @@ -129,7 +118,7 @@ class RedisTransport(TransportBase): channel = "%s%s" % (platform.node(), int(time.time())) pubsub.subscribe(channel) - self._redis.rpush(RedisTransport.COMMAND_KEY, + self._redis.rpush(RedisTransport.COMMAND_KEY, cPickle.dumps(RedisMessage(channel, method, args, kwargs))) resp = pubsub.listen() @@ -160,7 +149,7 @@ class RedisTransport(TransportBase): continue message = cPickle.loads(payload[1]) if not isinstance(message, RedisMessage): - self.logger.error("Message \"%s\" is not a RedisMessage" % + self.logger.error("Message \"%s\" is not a RedisMessage" % message) if not message.method in collector.storage.__class__.__rmi__ or\ @@ -192,5 +181,3 @@ class RedisTransport(TransportBase): self.logger.error("Unhandled exception in command thread: %s" % traceback.format_exc().splitlines()[-1]) self.logger.info("Command thread shutdown") - - diff --git a/src/lib/Bcfg2/Reporting/Transport/__init__.py b/src/lib/Bcfg2/Reporting/Transport/__init__.py index 73bdd0b3a..04b574ed7 100644 --- a/src/lib/Bcfg2/Reporting/Transport/__init__.py +++ b/src/lib/Bcfg2/Reporting/Transport/__init__.py @@ -1,35 +1,3 @@ """ Public transport routines """ - -import sys -from Bcfg2.Reporting.Transport.base import TransportError, \ - TransportImportError - - -def load_transport(transport_name, setup): - """ - Try to load the transport. Raise TransportImportError on failure - """ - try: - mod_name = "%s.%s" % (__name__, transport_name) - mod = getattr(__import__(mod_name).Reporting.Transport, transport_name) - except ImportError: - try: - mod = __import__(transport_name) - except: - raise TransportImportError("Error importing transport %s: %s" % - (transport_name, sys.exc_info()[1])) - try: - return getattr(mod, transport_name)(setup) - except: - raise TransportImportError("Error instantiating transport %s: %s" % - (transport_name, sys.exc_info()[1])) - - -def load_transport_from_config(setup): - """Load the transport in the config... eventually""" - try: - return load_transport(setup['reporting_transport'], setup) - except KeyError: - raise TransportImportError('Transport missing in config') diff --git a/src/lib/Bcfg2/Reporting/Transport/base.py b/src/lib/Bcfg2/Reporting/Transport/base.py index 530011e47..9a0a4262f 100644 --- a/src/lib/Bcfg2/Reporting/Transport/base.py +++ b/src/lib/Bcfg2/Reporting/Transport/base.py @@ -4,7 +4,8 @@ The base for all server -> collector Transports import os import sys -from Bcfg2.Server.Plugin import Debuggable +import Bcfg2.Options +from Bcfg2.Logger import Debuggable class TransportError(Exception): @@ -12,20 +13,18 @@ class TransportError(Exception): pass -class TransportImportError(TransportError): - """Raised when a transport fails to import""" - pass - - class TransportBase(Debuggable): """The base for all transports""" - def __init__(self, setup): + options = Debuggable.options + + def __init__(self): """Do something here""" clsname = self.__class__.__name__ Debuggable.__init__(self, name=clsname) self.debug_log("Loading %s transport" % clsname) - self.data = os.path.join(setup['repo'], 'Reporting', clsname) + self.data = os.path.join(Bcfg2.Options.setup.repository, 'Reporting', + clsname) if not os.path.exists(self.data): self.logger.info("%s does not exist, creating" % self.data) try: @@ -34,7 +33,6 @@ class TransportBase(Debuggable): self.logger.warning("Could not create %s: %s" % (self.data, sys.exc_info()[1])) self.logger.warning("The transport may not function properly") - self.setup = setup self.timeout = 2 def start_monitor(self, collector): diff --git a/src/lib/Bcfg2/Reporting/migrations/0002_convert_perms_to_mode.py b/src/lib/Bcfg2/Reporting/migrations/0002_convert_perms_to_mode.py index 668094cf5..37cdd146c 100644 --- a/src/lib/Bcfg2/Reporting/migrations/0002_convert_perms_to_mode.py +++ b/src/lib/Bcfg2/Reporting/migrations/0002_convert_perms_to_mode.py @@ -3,8 +3,7 @@ import datetime from south.db import db from south.v2 import SchemaMigration from django.db import models - -from Bcfg2 import settings +from django.conf import settings class Migration(SchemaMigration): diff --git a/src/lib/Bcfg2/Reporting/models.py b/src/lib/Bcfg2/Reporting/models.py index fc9523067..2d96990b1 100644 --- a/src/lib/Bcfg2/Reporting/models.py +++ b/src/lib/Bcfg2/Reporting/models.py @@ -3,7 +3,7 @@ import sys from django.core.exceptions import ImproperlyConfigured try: - from django.db import models, backend, connection + from django.db import models, backend, connections except ImproperlyConfigured: e = sys.exc_info()[1] print("Reports: unable to import django models: %s" % e) @@ -12,6 +12,7 @@ except ImproperlyConfigured: from django.core.cache import cache from datetime import datetime, timedelta from Bcfg2.Compat import cPickle +from Bcfg2.DBSettings import get_db_label TYPE_GOOD = 0 @@ -61,7 +62,8 @@ def _quote(value): global _our_backend if not _our_backend: try: - _our_backend = backend.DatabaseOperations(connection) + _our_backend = backend.DatabaseOperations( + connections[get_db_label('Reporting')]) except TypeError: _our_backend = backend.DatabaseOperations() return _our_backend.quote_name(value) @@ -91,8 +93,8 @@ class InteractionManager(models.Manager): maxdate -- datetime object. Most recent date to pull. (default None) """ - from django.db import connection - cursor = connection.cursor() + from django.db import connections + cursor = connections[get_db_label('Reporting')].cursor() cfilter = "expiration is null" sql = 'select ri.id, x.client_id from ' + \ @@ -381,7 +383,7 @@ class BaseEntry(models.Model): @classmethod def entry_from_type(cls, etype): - for entry_cls in ENTRY_CLASSES: + for entry_cls in ENTRY_TYPES: if etype == entry_cls.ENTRY_TYPE: return entry_cls else: diff --git a/src/lib/Bcfg2/Reporting/views.py b/src/lib/Bcfg2/Reporting/views.py index c7c2a503f..0b8ed65cc 100644 --- a/src/lib/Bcfg2/Reporting/views.py +++ b/src/lib/Bcfg2/Reporting/views.py @@ -13,7 +13,7 @@ from django.http import \ from django.shortcuts import render_to_response, get_object_or_404 from django.core.urlresolvers import \ resolve, reverse, Resolver404, NoReverseMatch -from django.db import connection, DatabaseError +from django.db import DatabaseError from django.db.models import Q, Count from Bcfg2.Reporting.models import * |