diff options
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/Bcfg2/Reporting/Collector.py | 39 |
1 files changed, 29 insertions, 10 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py index 4556cda82..68e1d6a6d 100644 --- a/src/lib/Bcfg2/Reporting/Collector.py +++ b/src/lib/Bcfg2/Reporting/Collector.py @@ -20,10 +20,36 @@ from Bcfg2.Reporting.Transport.DirectStore import DirectStore from Bcfg2.Reporting.Storage import load_storage_from_config, \ StorageError, StorageImportError + class ReportingError(Exception): """Generic reporting exception""" pass + +class ReportingStoreThread(threading.Thread): + """Thread for calling the storage backend""" + def __init__(self, interaction, storage, group=None, target=None, + name=None, *args, **kwargs): + """Initialize the thread with a reference to the interaction + as well as the storage engine to use""" + threading.Thread.__init__(self, group, target, name, args, kwargs) + self.interaction = interaction + self.storage = storage + self.logger = logging.getLogger('bcfg2-report-collector') + + def run(self): + try: + start = time.time() + self.storage.import_interaction(self.interaction) + self.logger.info("Imported interaction for %s in %ss" % + (self.interaction.get('hostname', '<unknown>'), + time.time() - start)) + except: + #TODO requeue? + self.logger.error("Unhandled exception in import thread %s" % + traceback.format_exc().splitlines()[-1]) + + class ReportingCollector(object): """The collecting process for reports""" @@ -77,7 +103,6 @@ class ReportingCollector(object): (self.storage.__class__.__name__, traceback.format_exc().splitlines()[-1])) - def run(self): """Startup the processing and go!""" self.terminate = threading.Event() @@ -103,15 +128,9 @@ class ReportingCollector(object): interaction = self.transport.fetch() if not interaction: continue - try: - start = time.time() - self.storage.import_interaction(interaction) - self.logger.info("Imported interaction for %s in %ss" % - (interaction.get('hostname', '<unknown>'), - time.time() - start)) - except: - #TODO requeue? - raise + + t = ReportingStoreThread(interaction, self.storage) + t.start() except (SystemExit, KeyboardInterrupt): self.logger.info("Shutting down") self.shutdown() |