summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJames Yang <jjyang@mcs.anl.gov>2008-08-08 21:32:13 +0000
committerJames Yang <jjyang@mcs.anl.gov>2008-08-08 21:32:13 +0000
commitbd8efd295e7e28c8d8b6c76f818087ee27e7e23b (patch)
tree24798f00cd55baaafadfcefd311e7f5afb6d9721
parente7291a122516106ca2d515c26f3d93318fce4d29 (diff)
downloadbcfg2-bd8efd295e7e28c8d8b6c76f818087ee27e7e23b.tar.gz
bcfg2-bd8efd295e7e28c8d8b6c76f818087ee27e7e23b.tar.bz2
bcfg2-bd8efd295e7e28c8d8b6c76f818087ee27e7e23b.zip
Created working static fam infrastructure for Ticket #536
git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@4875 ce84e21b-d406-0410-9b95-82705330c041
-rw-r--r--src/lib/Server/Core.py183
-rw-r--r--src/lib/Server/FileMonitor.py235
2 files changed, 239 insertions, 179 deletions
diff --git a/src/lib/Server/Core.py b/src/lib/Server/Core.py
index b85198178..5687cba9d 100644
--- a/src/lib/Server/Core.py
+++ b/src/lib/Server/Core.py
@@ -2,9 +2,11 @@
__revision__ = '$Revision$'
from time import time
+
from Bcfg2.Server.Plugin import PluginInitError, PluginExecutionError
+import Bcfg2.Server.FileMonitor
-import logging, lxml.etree, os, stat
+import logging, lxml.etree, os
import Bcfg2.Server.Plugins.Metadata
logger = logging.getLogger('Bcfg2.Core')
@@ -15,187 +17,10 @@ try:
except:
pass
-def ShouldIgnore(event):
- '''Test if the event should be suppresed'''
- # FIXME should move event suppression out of the core
- if event.filename.split('/')[-1] == '.svn':
- return True
- if event.filename.endswith('~') or event.filename.endswith('.tmp') \
- or event.filename.startswith('#') or event.filename.startswith('.#'):
- logger.error("Suppressing event for file %s" % (event.filename))
- return True
- return False
-
class CoreInitError(Exception):
'''This error is raised when the core cannot be initialized'''
pass
-class FamFam(object):
- '''The fam object is a set of callbacks for file alteration events (FAM support)'''
-
- def __init__(self):
- object.__init__(self)
- self.fm = _fam.open()
- self.users = {}
- self.handles = {}
- self.debug = False
-
- def fileno(self):
- '''return fam file handle number'''
- return self.fm.fileno()
-
- def AddMonitor(self, path, obj):
- '''add a monitor to path, installing a callback to obj.HandleEvent'''
- mode = os.stat(path)[stat.ST_MODE]
- if stat.S_ISDIR(mode):
- handle = self.fm.monitorDirectory(path, None)
- else:
- handle = self.fm.monitorFile(path, None)
- self.handles[handle.requestID()] = handle
- if obj != None:
- self.users[handle.requestID()] = obj
- return handle.requestID()
-
- def Service(self):
- '''Handle all fam work'''
- count = 0
- collapsed = 0
- rawevents = []
- start = time()
- now = time()
- while (time() - now) < 0.10:
- if self.fm.pending():
- while self.fm.pending():
- count += 1
- rawevents.append(self.fm.nextEvent())
- now = time()
- unique = []
- bookkeeping = []
- for event in rawevents:
- if ShouldIgnore(event):
- continue
- if event.code2str() != 'changed':
- # process all non-change events
- unique.append(event)
- else:
- if (event.filename, event.requestID) not in bookkeeping:
- bookkeeping.append((event.filename, event.requestID))
- unique.append(event)
- else:
- collapsed += 1
- for event in unique:
- if self.users.has_key(event.requestID):
- try:
- self.users[event.requestID].HandleEvent(event)
- except:
- logger.error("handling event for file %s" % (event.filename), exc_info=1)
- end = time()
- logger.info("Processed %s fam events in %03.03f seconds. %s coalesced" %
- (count, (end - start), collapsed))
- return count
-
-class GaminEvent(object):
- '''This class provides an event analogous to python-fam events based on gamin sources'''
- def __init__(self, request_id, filename, code):
- action_map = {GAMCreated: 'created', GAMExists: 'exists', GAMChanged: 'changed',
- GAMDeleted: 'deleted', GAMEndExist: 'endExist', GAMMoved: 'moved'}
- self.requestID = request_id
- self.filename = filename
- if action_map.has_key(code):
- self.action = action_map[code]
-
- def code2str(self):
- '''return static code for event'''
- return self.action
-
-class GaminFam(object):
- '''The fam object is a set of callbacks for file alteration events (Gamin support)'''
- def __init__(self):
- object.__init__(self)
- self.mon = WatchMonitor()
- self.handles = {}
- self.counter = 0
- self.events = []
- self.debug = False
-
- def fileno(self):
- '''return fam file handle number'''
- return self.mon.get_fd()
-
- def queue(self, path, action, request_id):
- '''queue up the event for later handling'''
- self.events.append(GaminEvent(request_id, path, action))
-
- def AddMonitor(self, path, obj):
- '''add a monitor to path, installing a callback to obj.HandleEvent'''
- handle = self.counter
- self.counter += 1
- mode = os.stat(path)[stat.ST_MODE]
- if stat.S_ISDIR(mode):
- self.mon.watch_directory(path, self.queue, handle)
- else:
- self.mon.watch_file(path, self.queue, handle)
- self.handles[handle] = obj
- return handle
-
- def Service(self):
- '''Handle all gamin work'''
- count = 0
- collapsed = 0
- start = time()
- now = time()
- while (time() - now) < 0.10:
- if self.mon.event_pending():
- while self.mon.event_pending():
- count += 1
- self.mon.handle_one_event()
- now = time()
- unique = []
- bookkeeping = []
- for event in self.events:
- if ShouldIgnore(event):
- continue
- if event.code2str() != 'changed':
- # process all non-change events
- unique.append(event)
- else:
- if (event.filename, event.requestID) not in bookkeeping:
- bookkeeping.append((event.filename, event.requestID))
- unique.append(event)
- else:
- collapsed += 1
- self.events = []
- for event in unique:
- if event.requestID not in self.handles:
- logger.info("Got event for unexpected id %s, file %s" %
- (event.requestID, event.filename))
- continue
- if self.debug:
- logger.info("Dispatching event %s %s to obj %s" \
- % (event.code2str(), event.filename,
- self.handles[event.requestID]))
- try:
- self.handles[event.requestID].HandleEvent(event)
- except:
- logger.error("error in handling of gamin event for %s" % \
- (event.filename), exc_info=1)
- end = time()
- logger.info("Processed %s gamin events in %03.03f seconds. %s collapsed" %
- (count, (end - start), collapsed))
- return count
-
-try:
- from gamin import WatchMonitor, GAMCreated, GAMExists, GAMEndExist, GAMChanged, GAMDeleted, GAMMoved
- monitor = GaminFam
-except ImportError:
- # fall back to _fam
- try:
- import _fam
- monitor = FamFam
- except ImportError:
- print "Couldn't locate Fam module, exiting"
- raise SystemExit, 1
-
class Core(object):
'''The Core object is the container for all Bcfg2 Server logic, and modules'''
@@ -203,7 +28,7 @@ class Core(object):
object.__init__(self)
self.datastore = repo
try:
- self.fam = monitor()
+ self.fam = Bcfg2.Server.FileMonitor.default()
except IOError:
raise CoreInitError, "failed to connect to fam"
self.pubspace = {}
diff --git a/src/lib/Server/FileMonitor.py b/src/lib/Server/FileMonitor.py
new file mode 100644
index 000000000..347e37b7b
--- /dev/null
+++ b/src/lib/Server/FileMonitor.py
@@ -0,0 +1,235 @@
+from time import time
+import logging, os, stat
+
+logger = logging.getLogger('Bcfg2.Server.FileMonitor')
+
+def ShouldIgnore(event):
+ '''Test if the event should be suppresed'''
+ # FIXME should move event suppression out of the core
+ if event.filename.split('/')[-1] == '.svn':
+ return True
+ if event.filename.endswith('~') or event.filename.endswith('.tmp') \
+ or event.filename.startswith('#') or event.filename.startswith('.#'):
+ logger.error("Suppressing event for file %s" % (event.filename))
+ return True
+ return False
+
+class FamFam(object):
+ '''The fam object is a set of callbacks for file alteration events (FAM support)'''
+
+ def __init__(self):
+ object.__init__(self)
+ self.fm = _fam.open()
+ self.users = {}
+ self.handles = {}
+ self.debug = False
+
+ def fileno(self):
+ '''return fam file handle number'''
+ return self.fm.fileno()
+
+ def AddMonitor(self, path, obj):
+ '''add a monitor to path, installing a callback to obj.HandleEvent'''
+ mode = os.stat(path)[stat.ST_MODE]
+ if stat.S_ISDIR(mode):
+ handle = self.fm.monitorDirectory(path, None)
+ else:
+ handle = self.fm.monitorFile(path, None)
+ self.handles[handle.requestID()] = handle
+ if obj != None:
+ self.users[handle.requestID()] = obj
+ return handle.requestID()
+
+ def Service(self):
+ '''Handle all fam work'''
+ count = 0
+ collapsed = 0
+ rawevents = []
+ start = time()
+ now = time()
+ while (time() - now) < 0.10:
+ if self.fm.pending():
+ while self.fm.pending():
+ count += 1
+ rawevents.append(self.fm.nextEvent())
+ now = time()
+ unique = []
+ bookkeeping = []
+ for event in rawevents:
+ if ShouldIgnore(event):
+ continue
+ if event.code2str() != 'changed':
+ # process all non-change events
+ unique.append(event)
+ else:
+ if (event.filename, event.requestID) not in bookkeeping:
+ bookkeeping.append((event.filename, event.requestID))
+ unique.append(event)
+ else:
+ collapsed += 1
+ for event in unique:
+ if self.users.has_key(event.requestID):
+ try:
+ self.users[event.requestID].HandleEvent(event)
+ except:
+ logger.error("handling event for file %s" % (event.filename), exc_info=1)
+ end = time()
+ logger.info("Processed %s fam events in %03.03f seconds. %s coalesced" %
+ (count, (end - start), collapsed))
+ return count
+
+class Event(object):
+ def __init(self, request_id, filename, code):
+ self.requestID = request_id
+ self.filename = filename
+ self.action = code
+
+ def code2str(self):
+ '''return static code for event'''
+ return self.action
+
+
+class GaminEvent(Event):
+ '''This class provides an event analogous to python-fam events based on gamin sources'''
+ def __init__(self, request_id, filename, code):
+ Event.__init__(self, request_id, filename, code)
+ action_map = {GAMCreated: 'created', GAMExists: 'exists', GAMChanged: 'changed',
+ GAMDeleted: 'deleted', GAMEndExist: 'endExist', GAMMoved: 'moved'}
+ if action_map.has_key(code):
+ self.action = action_map[code]
+
+class GaminFam(object):
+ '''The fam object is a set of callbacks for file alteration events (Gamin support)'''
+ def __init__(self):
+ object.__init__(self)
+ self.mon = WatchMonitor()
+ self.handles = {}
+ self.counter = 0
+ self.events = []
+ self.debug = False
+
+ def fileno(self):
+ '''return fam file handle number'''
+ return self.mon.get_fd()
+
+ def queue(self, path, action, request_id):
+ '''queue up the event for later handling'''
+ self.events.append(GaminEvent(request_id, path, action))
+
+ def AddMonitor(self, path, obj):
+ '''add a monitor to path, installing a callback to obj.HandleEvent'''
+ handle = self.counter
+ self.counter += 1
+ mode = os.stat(path)[stat.ST_MODE]
+ if stat.S_ISDIR(mode):
+ self.mon.watch_directory(path, self.queue, handle)
+ else:
+ self.mon.watch_file(path, self.queue, handle)
+ self.handles[handle] = obj
+ return handle
+
+ def Service(self):
+ '''Handle all gamin work'''
+ count = 0
+ collapsed = 0
+ start = time()
+ now = time()
+ while (time() - now) < 0.10:
+ if self.mon.event_pending():
+ while self.mon.event_pending():
+ count += 1
+ self.mon.handle_one_event()
+ now = time()
+ unique = []
+ bookkeeping = []
+ for event in self.events:
+ if ShouldIgnore(event):
+ continue
+ if event.code2str() != 'changed':
+ # process all non-change events
+ unique.append(event)
+ else:
+ if (event.filename, event.requestID) not in bookkeeping:
+ bookkeeping.append((event.filename, event.requestID))
+ unique.append(event)
+ else:
+ collapsed += 1
+ self.events = []
+ for event in unique:
+ if event.requestID not in self.handles:
+ logger.info("Got event for unexpected id %s, file %s" %
+ (event.requestID, event.filename))
+ continue
+ if self.debug:
+ logger.info("Dispatching event %s %s to obj %s" \
+ % (event.code2str(), event.filename,
+ self.handles[event.requestID]))
+ try:
+ self.handles[event.requestID].HandleEvent(event)
+ except:
+ logger.error("error in handling of gamin event for %s" % \
+ (event.filename), exc_info=1)
+ end = time()
+ logger.info("Processed %s gamin events in %03.03f seconds. %s collapsed" %
+ (count, (end - start), collapsed))
+ return count
+
+class PseudoFam(object):
+ '''The fam object is a set of callbacks for file alteration events (FAM support)'''
+
+ def __init__(self):
+ object.__init__(self)
+ self.users = {}
+ self.handles = {}
+ self.debug = False
+ self.pending = []
+
+ def AddMonitor(self, path, obj):
+ '''add a monitor to path, installing a callback to obj.HandleEvent'''
+ handleID = len(self.handles.keys())
+ mode = os.stat(path)[stat.ST_MODE]
+ handle = GaminEvent(handleID, path, 'exists')
+ if stat.S_ISDIR(mode):
+ dirList = os.listdir(path)
+ self.pending.append(handle)
+ for includedFile in dirList:
+ self.pending.append(GaminEvent(handleID, includedFile, 'exists'))
+ self.pending.append(GaminEvent(handleID, path, 'endExist'))
+ else:
+ self.pending.append(GaminEvent(handleID, path, 'exists'))
+ self.handles[handleID] = handle
+ if obj != None:
+ self.users[handleID] = obj
+ return handleID
+
+ def Service(self):
+ '''Handle all fam work'''
+ count = 0
+ rawevents = []
+ for event in self.pending:
+ count += 1
+ rawevents.append(event)
+ self.pending = []
+ for event in rawevents:
+ if self.users.has_key(event.requestID):
+ self.users[event.requestID].HandleEvent(event)
+ return count
+
+available = {}
+try:
+ from gamin import WatchMonitor, GAMCreated, GAMExists, GAMEndExist, GAMChanged, GAMDeleted, GAMMoved
+ available['gamin'] = GaminFam
+except ImportError:
+ # fall back to _fam
+ pass
+try:
+ import _fam
+ available['fam'] = FamFam
+except ImportError:
+ pass
+available['pseudo'] = PseudoFam
+
+for fdrv in ['gamin', 'fam', 'pseudo']:
+ if fdrv in available:
+ default = available[fdrv]
+ break