summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/lib/Bcfg2/Reporting/Collector.py58
-rw-r--r--src/lib/Bcfg2/Reporting/Storage/DjangoORM.py30
-rw-r--r--src/lib/Bcfg2/Reporting/Storage/__init__.py29
-rw-r--r--src/lib/Bcfg2/Reporting/Storage/base.py14
-rw-r--r--src/lib/Bcfg2/Reporting/Transport/DirectStore.py17
-rw-r--r--src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py31
-rw-r--r--src/lib/Bcfg2/Reporting/Transport/RedisTransport.py55
-rw-r--r--src/lib/Bcfg2/Reporting/Transport/__init__.py32
-rw-r--r--src/lib/Bcfg2/Reporting/Transport/base.py15
-rwxr-xr-xsrc/sbin/bcfg2-report-collector14
10 files changed, 104 insertions, 191 deletions
diff --git a/src/lib/Bcfg2/Reporting/Collector.py b/src/lib/Bcfg2/Reporting/Collector.py
index 3d224432e..a1e6025e3 100644
--- a/src/lib/Bcfg2/Reporting/Collector.py
+++ b/src/lib/Bcfg2/Reporting/Collector.py
@@ -1,8 +1,8 @@
+import sys
import atexit
import daemon
import logging
import time
-import traceback
import threading
# pylint: disable=E0611
@@ -14,52 +14,53 @@ except ImportError:
# pylint: enable=E0611
import Bcfg2.Logger
-from Bcfg2.Reporting.Transport import load_transport_from_config, \
- TransportError, TransportImportError
+import Bcfg2.Options
+from Bcfg2.Reporting.Transport.base import TransportError
from Bcfg2.Reporting.Transport.DirectStore import DirectStore
-from Bcfg2.Reporting.Storage import load_storage_from_config, \
- StorageError, StorageImportError
+from Bcfg2.Reporting.Storage.base import StorageError
+
class ReportingError(Exception):
"""Generic reporting exception"""
pass
+
class ReportingCollector(object):
"""The collecting process for reports"""
+ options = [Bcfg2.Options.Common.reporting_storage,
+ Bcfg2.Options.Common.reporting_transport,
+ Bcfg2.Options.Common.daemon]
- def __init__(self, setup):
- """Setup the collector. This may be called by the daemon or though
+ def __init__(self):
+ """Setup the collector. This may be called by the daemon or though
bcfg2-admin"""
- self.setup = setup
- self.datastore = setup['repo']
- self.encoding = setup['encoding']
self.terminate = None
self.context = None
- if setup['debug']:
+ if Bcfg2.Options.setup.debug:
level = logging.DEBUG
- elif setup['verbose']:
+ elif Bcfg2.Options.setup.verbose:
level = logging.INFO
else:
level = logging.WARNING
Bcfg2.Logger.setup_logging('bcfg2-report-collector',
to_console=logging.INFO,
- to_syslog=setup['syslog'],
- to_file=setup['logging'],
+ to_syslog=Bcfg2.Options.setup.syslog,
+ to_file=Bcfg2.Options.setup.logging,
level=level)
self.logger = logging.getLogger('bcfg2-report-collector')
try:
- self.transport = load_transport_from_config(setup)
- self.storage = load_storage_from_config(setup)
+ self.transport = Bcfg2.Options.setup.transport()
+ self.storage = Bcfg2.Options.setup.reporting_storage()
except TransportError:
self.logger.error("Failed to load transport: %s" %
- traceback.format_exc().splitlines()[-1])
+ sys.exc_info()[1])
raise ReportingError
except StorageError:
self.logger.error("Failed to load storage: %s" %
- traceback.format_exc().splitlines()[-1])
+ sys.exc_info()[1])
raise ReportingError
if isinstance(self.transport, DirectStore):
@@ -69,14 +70,13 @@ class ReportingCollector(object):
raise ReportingError
try:
- self.logger.debug("Validating storage %s" %
- self.storage.__class__.__name__)
+ self.logger.debug("Validating storage %s" %
+ self.storage.__class__.__name__)
self.storage.validate()
except:
self.logger.error("Storage backed %s failed to validate: %s" %
- (self.storage.__class__.__name__,
- traceback.format_exc().splitlines()[-1]))
-
+ (self.storage.__class__.__name__,
+ sys.exc_info()[1]))
def run(self):
"""Startup the processing and go!"""
@@ -84,14 +84,14 @@ class ReportingCollector(object):
atexit.register(self.shutdown)
self.context = daemon.DaemonContext()
- if self.setup['daemon']:
+ if Bcfg2.Options.setup.daemon:
self.logger.debug("Daemonizing")
try:
- self.context.pidfile = PIDLockFile(self.setup['daemon'])
+ self.context.pidfile = PIDLockFile(Bcfg2.Options.setup.daemon)
self.context.open()
except PIDFileError:
self.logger.error("Error writing pid file: %s" %
- traceback.format_exc().splitlines()[-1])
+ sys.exc_info()[1])
self.shutdown()
return
self.logger.info("Starting daemon")
@@ -107,8 +107,8 @@ class ReportingCollector(object):
start = time.time()
self.storage.import_interaction(interaction)
self.logger.info("Imported interaction for %s in %ss" %
- (interaction.get('hostname', '<unknown>'),
- time.time() - start))
+ (interaction.get('hostname', '<unknown>'),
+ time.time() - start))
except:
#TODO requeue?
raise
@@ -117,7 +117,7 @@ class ReportingCollector(object):
self.shutdown()
except:
self.logger.error("Unhandled exception in main loop %s" %
- traceback.format_exc().splitlines()[-1])
+ sys.exc_info()[1])
def shutdown(self):
"""Cleanup and go"""
diff --git a/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py b/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py
index aea5e9d4b..9505682a7 100644
--- a/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py
+++ b/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py
@@ -11,6 +11,7 @@ from time import strptime
os.environ['DJANGO_SETTINGS_MODULE'] = 'Bcfg2.settings'
from Bcfg2 import settings
+import Bcfg2.Options
from Bcfg2.Compat import md5
from Bcfg2.Reporting.Storage.base import StorageBase, StorageError
from Bcfg2.Server.Plugin.exceptions import PluginExecutionError
@@ -27,9 +28,13 @@ from Bcfg2.Reporting.models import *
class DjangoORM(StorageBase):
- def __init__(self, setup):
- super(DjangoORM, self).__init__(setup)
- self.size_limit = setup.get('reporting_file_limit')
+ options = StorageBase.options + [
+ Bcfg2.Options.Common.repository,
+ Bcfg2.Options.Option(
+ cf=('reporting', 'file_limit'),
+ type=Bcfg2.Options.Types.size,
+ help='Reporting file size limit',
+ default=1024 * 1024)]
def _import_default(self, entry, state, entrytype=None, defaults=None,
mapping=None, boolean=None, xforms=None):
@@ -184,7 +189,7 @@ class DjangoORM(StorageBase):
act_dict['detail_type'] = PathEntry.DETAIL_DIFF
cdata = entry.get('current_bdiff')
if cdata:
- if len(cdata) > self.size_limit:
+ if len(cdata) > Bcfg2.Options.setup.file_limit:
act_dict['detail_type'] = PathEntry.DETAIL_SIZE_LIMIT
act_dict['details'] = md5(cdata).hexdigest()
else:
@@ -364,31 +369,31 @@ class DjangoORM(StorageBase):
def import_interaction(self, interaction):
"""Import the data into the backend"""
-
try:
self._import_interaction(interaction)
except:
self.logger.error("Failed to import interaction: %s" %
- traceback.format_exc().splitlines()[-1])
+ sys.exc_info()[1])
def validate(self):
"""Validate backend storage. Should be called once when loaded"""
- settings.read_config(repo=self.setup['repo'])
+ settings.read_config(repo=Bcfg2.Options.setup.repository)
# verify our database schema
try:
- if self.setup['debug']:
+ if Bcfg2.Options.setup.debug:
vrb = 2
- elif self.setup['verbose']:
+ elif Bcfg2.Options.setup.verbose:
vrb = 1
else:
vrb = 0
management.call_command("syncdb", verbosity=vrb, interactive=False)
- management.call_command("migrate", verbosity=vrb, interactive=False)
+ management.call_command("migrate", verbosity=vrb,
+ interactive=False)
except:
- self.logger.error("Failed to update database schema: %s" % \
- traceback.format_exc().splitlines()[-1])
+ self.logger.error("Failed to update database schema: %s" %
+ sys.exc_info()[1])
raise StorageError
def GetExtra(self, client):
@@ -451,4 +456,3 @@ class DjangoORM(StorageBase):
else:
ret.append(None)
return ret
-
diff --git a/src/lib/Bcfg2/Reporting/Storage/__init__.py b/src/lib/Bcfg2/Reporting/Storage/__init__.py
index 85356fcfe..953104d4b 100644
--- a/src/lib/Bcfg2/Reporting/Storage/__init__.py
+++ b/src/lib/Bcfg2/Reporting/Storage/__init__.py
@@ -1,32 +1,3 @@
"""
Public storage routines
"""
-
-import traceback
-
-from Bcfg2.Reporting.Storage.base import StorageError, \
- StorageImportError
-
-def load_storage(storage_name, setup):
- """
- Try to load the storage. Raise StorageImportError on failure
- """
- try:
- mod_name = "%s.%s" % (__name__, storage_name)
- mod = getattr(__import__(mod_name).Reporting.Storage, storage_name)
- except ImportError:
- try:
- mod = __import__(storage_name)
- except:
- raise StorageImportError("Unavailable")
- try:
- cls = getattr(mod, storage_name)
- return cls(setup)
- except:
- raise StorageImportError("Storage unavailable: %s" %
- traceback.format_exc().splitlines()[-1])
-
-def load_storage_from_config(setup):
- """Load the storage in the config... eventually"""
- return load_storage('DjangoORM', setup)
-
diff --git a/src/lib/Bcfg2/Reporting/Storage/base.py b/src/lib/Bcfg2/Reporting/Storage/base.py
index 92cc3a68b..771f755a1 100644
--- a/src/lib/Bcfg2/Reporting/Storage/base.py
+++ b/src/lib/Bcfg2/Reporting/Storage/base.py
@@ -2,28 +2,25 @@
The base for all Storage backends
"""
-import logging
+import logging
+
class StorageError(Exception):
"""Generic StorageError"""
pass
-class StorageImportError(StorageError):
- """Raised when a storage module fails to import"""
- pass
-
class StorageBase(object):
"""The base for all storages"""
+ options = []
+
__rmi__ = ['Ping', 'GetExtra', 'GetCurrentEntry']
- def __init__(self, setup):
+ def __init__(self):
"""Do something here"""
clsname = self.__class__.__name__
self.logger = logging.getLogger(clsname)
self.logger.debug("Loading %s storage" % clsname)
- self.setup = setup
- self.encoding = setup['encoding']
def import_interaction(self, interaction):
"""Import the data into the backend"""
@@ -48,4 +45,3 @@ class StorageBase(object):
def GetCurrentEntry(self, client, e_type, e_name):
"""Get the current status of an entry on the client"""
raise NotImplementedError
-
diff --git a/src/lib/Bcfg2/Reporting/Transport/DirectStore.py b/src/lib/Bcfg2/Reporting/Transport/DirectStore.py
index 79d1b5aba..b9d17212e 100644
--- a/src/lib/Bcfg2/Reporting/Transport/DirectStore.py
+++ b/src/lib/Bcfg2/Reporting/Transport/DirectStore.py
@@ -5,18 +5,20 @@ import os
import sys
import time
import threading
+import Bcfg2.Options
from Bcfg2.Reporting.Transport.base import TransportBase, TransportError
-from Bcfg2.Reporting.Storage import load_storage_from_config
from Bcfg2.Compat import Queue, Full, Empty, cPickle
class DirectStore(TransportBase, threading.Thread):
- def __init__(self, setup):
- TransportBase.__init__(self, setup)
+ options = TransportBase.options + [Bcfg2.Options.Common.reporting_storage]
+
+ def __init__(self):
+ TransportBase.__init__(self)
threading.Thread.__init__(self)
self.save_file = os.path.join(self.data, ".saved")
- self.storage = load_storage_from_config(setup)
+ self.storage = Bcfg2.Options.setup.reporting_storage()
self.storage.validate()
self.queue = Queue(100000)
@@ -30,10 +32,9 @@ class DirectStore(TransportBase, threading.Thread):
def store(self, hostname, metadata, stats):
try:
- self.queue.put_nowait(dict(
- hostname=hostname,
- metadata=metadata,
- stats=stats))
+ self.queue.put_nowait(dict(hostname=hostname,
+ metadata=metadata,
+ stats=stats))
except Full:
self.logger.warning("Reporting: Queue is full, "
"dropping statistics")
diff --git a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py
index c7d5c512a..d901ded56 100644
--- a/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py
+++ b/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py
@@ -9,6 +9,8 @@ import os
import select
import time
import traceback
+import Bcfg2.Options
+import Bcfg2.CommonOptions
import Bcfg2.Server.FileMonitor
from Bcfg2.Reporting.Collector import ReportingCollector, ReportingError
from Bcfg2.Reporting.Transport.base import TransportBase, TransportError
@@ -16,8 +18,10 @@ from Bcfg2.Compat import cPickle
class LocalFilesystem(TransportBase):
- def __init__(self, setup):
- super(LocalFilesystem, self).__init__(setup)
+ options = TransportBase.options + [Bcfg2.Options.Common.filemonitor]
+
+ def __init__(self):
+ super(LocalFilesystem, self).__init__()
self.work_path = "%s/work" % self.data
self.debug_log("LocalFilesystem: work path %s" % self.work_path)
@@ -42,24 +46,16 @@ class LocalFilesystem(TransportBase):
def start_monitor(self, collector):
"""Start the file monitor. Most of this comes from BaseCore"""
- setup = self.setup
- try:
- fmon = Bcfg2.Server.FileMonitor.available[setup['filemonitor']]
- except KeyError:
- self.logger.error("File monitor driver %s not available; "
- "forcing to default" % setup['filemonitor'])
- fmon = Bcfg2.Server.FileMonitor.available['default']
- if self.debug_flag:
- self.fmon.set_debug(self.debug_flag)
try:
- self.fmon = fmon(debug=self.debug_flag)
- self.logger.info("Using the %s file monitor" %
- self.fmon.__class__.__name__)
+ self.fmon = Bcfg2.Server.FileMonitor.get_fam()
except IOError:
- msg = "Failed to instantiate file monitor %s" % \
- setup['filemonitor']
+ msg = "Failed to instantiate fam driver %s" % \
+ Bcfg2.Options.setup.filemonitor
self.logger.error(msg, exc_info=1)
raise TransportError(msg)
+
+ if self.debug_flag:
+ self.fmon.set_debug(self.debug_flag)
self.fmon.start()
self.fmon.AddMonitor(self.work_path, self)
@@ -154,7 +150,7 @@ class LocalFilesystem(TransportBase):
"""
try:
if not self._phony_collector:
- self._phony_collector = ReportingCollector(self.setup)
+ self._phony_collector = ReportingCollector()
except ReportingError:
raise TransportError
except:
@@ -176,4 +172,3 @@ class LocalFilesystem(TransportBase):
self.logger.error("RPC method %s failed: %s" %
(method, traceback.format_exc().splitlines()[-1]))
raise TransportError
-
diff --git a/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py b/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py
index 22d9af57e..7427c2e1d 100644
--- a/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py
+++ b/src/lib/Bcfg2/Reporting/Transport/RedisTransport.py
@@ -9,9 +9,9 @@ import signal
import platform
import traceback
import threading
+import Bcfg2.Options
from Bcfg2.Reporting.Transport.base import TransportBase, TransportError
from Bcfg2.Compat import cPickle
-from Bcfg2.Options import Option
try:
import redis
@@ -34,9 +34,19 @@ class RedisTransport(TransportBase):
STATS_KEY = 'bcfg2_statistics'
COMMAND_KEY = 'bcfg2_command'
- def __init__(self, setup):
- super(RedisTransport, self).__init__(setup)
- self._redis = None
+ options = TransportBase.options + [
+ Bcfg2.Options.Option(
+ cf=('reporting', 'redis_host'), dest="reporting_redis_host",
+ default='127.0.0.1', help='Reporting Redis host'),
+ Bcfg2.Options.Option(
+ cf=('reporting', 'redis_port'), dest="reporting_redis_port",
+ default=6379, type=int, help='Reporting Redis port'),
+ Bcfg2.Options.Option(
+ cf=('reporting', 'redis_db'), dest="reporting_redis_db",
+ default=0, type=int, help='Reporting Redis DB')]
+
+ def __init__(self):
+ super(RedisTransport, self).__init__()
self._commands = None
self.logger.error("Warning: RedisTransport is experimental")
@@ -45,36 +55,15 @@ class RedisTransport(TransportBase):
self.logger.error("redis python module is not available")
raise TransportError
- setup.update(dict(
- reporting_redis_host=Option(
- 'Redis Host',
- default='127.0.0.1',
- cf=('reporting', 'redis_host')),
- reporting_redis_port=Option(
- 'Redis Port',
- default=6379,
- cf=('reporting', 'redis_port')),
- reporting_redis_db=Option(
- 'Redis DB',
- default=0,
- cf=('reporting', 'redis_db')),
- ))
- setup.reparse()
-
- self._redis_host = setup.get('reporting_redis_host', '127.0.0.1')
- try:
- self._redis_port = int(setup.get('reporting_redis_port', 6379))
- except ValueError:
- self.logger.error("Redis port must be an integer")
- raise TransportError
- self._redis_db = setup.get('reporting_redis_db', 0)
- self._redis = redis.Redis(host=self._redis_host,
- port=self._redis_port, db=self._redis_db)
+ self._redis = redis.Redis(
+ host=Bcfg2.Options.setup.reporting_redis_host,
+ port=Bcfg2.Options.setup.reporting_redis_port,
+ db=Bcfg2.Options.setup.reporting_redis_db)
def start_monitor(self, collector):
"""Start the monitor. Eventaully start the command thread"""
- self._commands = threading.Thread(target=self.monitor_thread,
+ self._commands = threading.Thread(target=self.monitor_thread,
args=(self._redis, collector))
self._commands.start()
@@ -129,7 +118,7 @@ class RedisTransport(TransportBase):
channel = "%s%s" % (platform.node(), int(time.time()))
pubsub.subscribe(channel)
- self._redis.rpush(RedisTransport.COMMAND_KEY,
+ self._redis.rpush(RedisTransport.COMMAND_KEY,
cPickle.dumps(RedisMessage(channel, method, args, kwargs)))
resp = pubsub.listen()
@@ -160,7 +149,7 @@ class RedisTransport(TransportBase):
continue
message = cPickle.loads(payload[1])
if not isinstance(message, RedisMessage):
- self.logger.error("Message \"%s\" is not a RedisMessage" %
+ self.logger.error("Message \"%s\" is not a RedisMessage" %
message)
if not message.method in collector.storage.__class__.__rmi__ or\
@@ -192,5 +181,3 @@ class RedisTransport(TransportBase):
self.logger.error("Unhandled exception in command thread: %s" %
traceback.format_exc().splitlines()[-1])
self.logger.info("Command thread shutdown")
-
-
diff --git a/src/lib/Bcfg2/Reporting/Transport/__init__.py b/src/lib/Bcfg2/Reporting/Transport/__init__.py
index 73bdd0b3a..04b574ed7 100644
--- a/src/lib/Bcfg2/Reporting/Transport/__init__.py
+++ b/src/lib/Bcfg2/Reporting/Transport/__init__.py
@@ -1,35 +1,3 @@
"""
Public transport routines
"""
-
-import sys
-from Bcfg2.Reporting.Transport.base import TransportError, \
- TransportImportError
-
-
-def load_transport(transport_name, setup):
- """
- Try to load the transport. Raise TransportImportError on failure
- """
- try:
- mod_name = "%s.%s" % (__name__, transport_name)
- mod = getattr(__import__(mod_name).Reporting.Transport, transport_name)
- except ImportError:
- try:
- mod = __import__(transport_name)
- except:
- raise TransportImportError("Error importing transport %s: %s" %
- (transport_name, sys.exc_info()[1]))
- try:
- return getattr(mod, transport_name)(setup)
- except:
- raise TransportImportError("Error instantiating transport %s: %s" %
- (transport_name, sys.exc_info()[1]))
-
-
-def load_transport_from_config(setup):
- """Load the transport in the config... eventually"""
- try:
- return load_transport(setup['reporting_transport'], setup)
- except KeyError:
- raise TransportImportError('Transport missing in config')
diff --git a/src/lib/Bcfg2/Reporting/Transport/base.py b/src/lib/Bcfg2/Reporting/Transport/base.py
index 530011e47..9fbf8c9d5 100644
--- a/src/lib/Bcfg2/Reporting/Transport/base.py
+++ b/src/lib/Bcfg2/Reporting/Transport/base.py
@@ -4,7 +4,7 @@ The base for all server -> collector Transports
import os
import sys
-from Bcfg2.Server.Plugin import Debuggable
+from Bcfg2.Logger import Debuggable
class TransportError(Exception):
@@ -12,20 +12,18 @@ class TransportError(Exception):
pass
-class TransportImportError(TransportError):
- """Raised when a transport fails to import"""
- pass
-
-
class TransportBase(Debuggable):
"""The base for all transports"""
- def __init__(self, setup):
+ options = Debuggable.options
+
+ def __init__(self):
"""Do something here"""
clsname = self.__class__.__name__
Debuggable.__init__(self, name=clsname)
self.debug_log("Loading %s transport" % clsname)
- self.data = os.path.join(setup['repo'], 'Reporting', clsname)
+ self.data = os.path.join(Bcfg2.Options.setup.repository, 'Reporting',
+ clsname)
if not os.path.exists(self.data):
self.logger.info("%s does not exist, creating" % self.data)
try:
@@ -34,7 +32,6 @@ class TransportBase(Debuggable):
self.logger.warning("Could not create %s: %s" %
(self.data, sys.exc_info()[1]))
self.logger.warning("The transport may not function properly")
- self.setup = setup
self.timeout = 2
def start_monitor(self, collector):
diff --git a/src/sbin/bcfg2-report-collector b/src/sbin/bcfg2-report-collector
index ae6d3b167..00e015100 100755
--- a/src/sbin/bcfg2-report-collector
+++ b/src/sbin/bcfg2-report-collector
@@ -11,20 +11,14 @@ from Bcfg2.Reporting.Collector import ReportingCollector, ReportingError
def main():
+ parser = Bcfg2.Options.get_parser(description="Collect Bcfg2 report data",
+ components=[ReportingCollector])
+ parser.parse()
logger = logging.getLogger('bcfg2-report-collector')
- optinfo = dict(daemon=Bcfg2.Options.DAEMON,
- repo=Bcfg2.Options.SERVER_REPOSITORY,
- filemonitor=Bcfg2.Options.SERVER_FILEMONITOR,
- web_configfile=Bcfg2.Options.WEB_CFILE)
- optinfo.update(Bcfg2.Options.CLI_COMMON_OPTIONS)
- optinfo.update(Bcfg2.Options.REPORTING_COMMON_OPTIONS)
- setup = Bcfg2.Options.load_option_parser(optinfo)
- setup.parse()
# run collector
try:
- collector = ReportingCollector(setup)
- collector.run()
+ ReportingCollector().run()
except ReportingError:
msg = sys.exc_info()[1]
logger.error(msg)