diff options
author | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2012-08-15 09:06:43 -0400 |
---|---|---|
committer | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2012-08-15 09:06:43 -0400 |
commit | b862090945322d5ba4b42e180bba92afb860df21 (patch) | |
tree | 1c482adfa9561bad14d82fc442f8f319b33b1d4f | |
parent | 7890fd0aa5331541c71b893c313553765ca1628e (diff) | |
download | bcfg2-b862090945322d5ba4b42e180bba92afb860df21.tar.gz bcfg2-b862090945322d5ba4b42e180bba92afb860df21.tar.bz2 bcfg2-b862090945322d5ba4b42e180bba92afb860df21.zip |
POSIX:
refactored POSIX tool into multiple files to make it more manageable
Added unit tests for POSIX tool and sub-tools
fixed ACL handling for filesystems mounted noacl
23 files changed, 3396 insertions, 1259 deletions
diff --git a/schemas/types.xsd b/schemas/types.xsd index 83cc2c9ee..edbc8ad37 100644 --- a/schemas/types.xsd +++ b/schemas/types.xsd @@ -140,7 +140,6 @@ <xsd:attribute type="DeviceTypeEnum" name="dev_type"/> <xsd:attribute type="xsd:integer" name="major"/> <xsd:attribute type="xsd:integer" name="minor"/> - <xsd:attribute type="xsd:string" name="mode"/> <xsd:attribute type="xsd:string" name="perms"/> <xsd:attribute type="xsd:string" name="owner"/> <xsd:attribute type="xsd:string" name="group"/> diff --git a/src/lib/Bcfg2/Client/Frame.py b/src/lib/Bcfg2/Client/Frame.py index b456738d0..bfdc90d38 100644 --- a/src/lib/Bcfg2/Client/Frame.py +++ b/src/lib/Bcfg2/Client/Frame.py @@ -84,7 +84,7 @@ class Frame: self.whitelist = [] self.blacklist = [] self.removal = [] - self.logger = logging.getLogger("Bcfg2.Client.Frame") + self.logger = logging.getLogger(__name__) for driver in drivers[:]: if driver not in Bcfg2.Client.Tools.drivers and \ isinstance(driver, str): diff --git a/src/lib/Bcfg2/Client/Tools/POSIX.py b/src/lib/Bcfg2/Client/Tools/POSIX.py deleted file mode 100644 index 64ea1b3e8..000000000 --- a/src/lib/Bcfg2/Client/Tools/POSIX.py +++ /dev/null @@ -1,1239 +0,0 @@ -"""All POSIX Type client support for Bcfg2.""" - -import binascii -from datetime import datetime -import difflib -import errno -import grp -import logging -import os -import pwd -import shutil -import stat -import sys -import time -# py3k compatibility -if sys.hexversion >= 0x03000000: - unicode = str - -import Bcfg2.Client.Tools -import Bcfg2.Options -from Bcfg2.Client import XML - -log = logging.getLogger(__name__) - -try: - import selinux - has_selinux = True -except ImportError: - has_selinux = False - -try: - import posix1e - has_acls = True -except ImportError: - has_acls = False - - -# map between dev_type attribute and stat constants -device_map = {'block': stat.S_IFBLK, - 'char': stat.S_IFCHR, - 'fifo': stat.S_IFIFO} - -# map between permissions characters and numeric ACL constants -acl_map = dict(r=posix1e.ACL_READ, - w=posix1e.ACL_WRITE, - x=posix1e.ACL_EXECUTE) - - -class POSIX(Bcfg2.Client.Tools.Tool): - """POSIX File support code.""" - name = 'POSIX' - __handles__ = [('Path', 'device'), - ('Path', 'directory'), - ('Path', 'file'), - ('Path', 'hardlink'), - ('Path', 'nonexistent'), - ('Path', 'permissions'), - ('Path', 'symlink')] - __req__ = dict(Path=dict( - device=['name', 'dev_type', 'perms', 'owner', 'group'], - directory=['name', 'perms', 'owner', 'group'], - file=['name', 'perms', 'owner', 'group'], - hardlink=['name', 'to'], - nonexistent=['name'], - permissions=['name', 'perms', 'owner', 'group'], - symlink=['name', 'to'])) - - # grab paranoid options from /etc/bcfg2.conf - opts = {'ppath': Bcfg2.Options.PARANOID_PATH, - 'max_copies': Bcfg2.Options.PARANOID_MAX_COPIES} - setup = Bcfg2.Options.OptionParser(opts) - setup.parse([]) - ppath = setup['ppath'] - max_copies = setup['max_copies'] - - def canInstall(self, entry): - """Check if entry is complete for installation.""" - if Bcfg2.Client.Tools.Tool.canInstall(self, entry): - if (entry.get('type') == 'file' and - entry.text is None and - entry.get('empty', 'false') == 'false'): - return False - return True - else: - return False - - def gatherCurrentData(self, entry): - if entry.tag == 'Path' and entry.get('type') == 'file': - try: - ondisk = os.stat(entry.get('name')) - except OSError: - entry.set('current_exists', 'false') - self.logger.debug("%s %s does not exist" % - (entry.tag, entry.get('name'))) - return False - try: - entry.set('current_owner', str(ondisk[stat.ST_UID])) - entry.set('current_group', str(ondisk[stat.ST_GID])) - except (OSError, KeyError): - pass - - if has_selinux: - try: - entry.set('current_secontext', - selinux.getfilecon(entry.get('name'))[1]) - except (OSError, KeyError): - pass - entry.set('perms', str(oct(ondisk[stat.ST_MODE])[-4:])) - - def _set_perms(self, entry, path=None): - if path is None: - path = entry.get("name") - - if (entry.get('perms') == None or - entry.get('owner') == None or - entry.get('group') == None): - self.logger.error('Entry %s not completely specified. ' - 'Try running bcfg2-lint.' % entry.get('name')) - return False - - rv = True - # split this into multiple try...except blocks so that even if a - # chown fails, the chmod can succeed -- get as close to the - # desired state as we can - try: - self.logger.debug("Setting ownership of %s to %s:%s" % - (path, - self._norm_entry_uid(entry), - self._norm_entry_gid(entry))) - os.chown(path, self._norm_entry_uid(entry), - self._norm_entry_gid(entry)) - except KeyError: - self.logger.error('Failed to change ownership of %s' % path) - rv = False - os.chown(path, 0, 0) - except OSError: - self.logger.error('Failed to change ownership of %s' % path) - rv = False - - configPerms = int(entry.get('perms'), 8) - if entry.get('dev_type'): - configPerms |= device_map[entry.get('dev_type')] - try: - self.logger.debug("Setting permissions on %s to %s" % - (path, oct(configPerms))) - os.chmod(path, configPerms) - except (OSError, KeyError): - self.logger.error('Failed to change permissions mode of %s' % path) - rv = False - - recursive = entry.get("recursive", "false").lower() == "true" - return (self._set_secontext(entry, path=path, recursive=recursive) and - self._set_acls(entry, path=path, recursive=recursive) and - rv) - - def _set_acls(self, entry, path=None, recursive=True): - """ set POSIX ACLs on the file on disk according to the config """ - if not has_acls: - if entry.findall("ACL"): - self.logger.debug("ACLs listed for %s but no pylibacl library " - "installed" % entry.get('name')) - return True - - if path is None: - path = entry.get("name") - - acl = posix1e.ACL(file=path) - # clear ACLs out so we start fresh -- way easier than trying - # to add/remove/modify ACLs - for aclentry in acl: - if aclentry.tag_type in [posix1e.ACL_USER, posix1e.ACL_GROUP]: - acl.delete_entry(aclentry) - if os.path.isdir(path): - defacl = posix1e.ACL(filedef=path) - if not defacl.valid(): - # when a default ACL is queried on a directory that - # has no default ACL entries at all, you get an empty - # ACL, which is not valid. in this circumstance, we - # just copy the access ACL to get a base valid ACL - # that we can add things to. - defacl = posix1e.ACL(acl=acl) - else: - for aclentry in defacl: - if aclentry.tag_type in [posix1e.ACL_USER, - posix1e.ACL_GROUP]: - defacl.delete_entry(aclentry) - else: - defacl = None - - for aclkey, perms in self._list_entry_acls(entry).items(): - atype, scope, qualifier = aclkey - if atype == "default": - if defacl is None: - self.logger.warning("Cannot set default ACLs on " - "non-directory %s" % path) - continue - entry = posix1e.Entry(defacl) - else: - entry = posix1e.Entry(acl) - for perm in acl_map.values(): - if perm & perms: - entry.permset.add(perm) - entry.tag_type = scope - try: - if scope == posix1e.ACL_USER: - scopename = "user" - entry.qualifier = self._norm_uid(qualifier) - elif scope == posix1e.ACL_GROUP: - scopename = "group" - entry.qualifier = self._norm_gid(qualifier) - except (OSError, KeyError): - err = sys.exc_info()[1] - self.logger.error("Could not resolve %s %s: %s" % - (scopename, qualifier, err)) - continue - acl.calc_mask() - - def _apply_acl(acl, path, atype=posix1e.ACL_TYPE_ACCESS): - if atype == posix1e.ACL_TYPE_ACCESS: - atype_str = "access" - else: - atype_str = "default" - if acl.valid(): - self.logger.debug("Applying %s ACL to %s:" % (atype_str, path)) - for line in str(acl).splitlines(): - self.logger.debug(" " + line) - try: - acl.applyto(path, atype) - return True - except: - err = sys.exc_info()[1] - self.logger.error("Failed to set ACLs on %s: %s" % - (path, err)) - return False - else: - self.logger.warning("%s ACL created for %s was invalid:" % - (atype_str.title(), path)) - for line in str(acl).splitlines(): - self.logger.warning(" " + line) - return False - - rv = _apply_acl(acl, path) - if defacl: - defacl.calc_mask() - rv &= _apply_acl(defacl, path, posix1e.ACL_TYPE_DEFAULT) - if recursive: - for root, dirs, files in os.walk(path): - for p in dirs + files: - rv &= _apply_acl(acl, p) - if defacl: - rv &= _apply_acl(defacl, p, posix1e.ACL_TYPE_DEFAULT) - return rv - - def _set_secontext(self, entry, path=None, recursive=False): - """ set the SELinux context of the file on disk according to the - config""" - if not has_selinux: - return True - - if path is None: - path = entry.get("name") - context = entry.get("secontext") - if context is None: - # no context listed - return True - - rv = True - if context == '__default__': - try: - selinux.restorecon(path, recursive=recursive) - except: - err = sys.exc_info()[1] - self.logger.error("Failed to restore SELinux context for %s: %s" - % (path, err)) - rv = False - else: - try: - rv &= selinux.lsetfilecon(path, context) == 0 - except: - err = sys.exc_info()[1] - self.logger.error("Failed to restore SELinux context for %s: %s" - % (path, err)) - rv = False - - if recursive: - for root, dirs, files in os.walk(path): - for p in dirs + files: - try: - rv &= selinux.lsetfilecon(p, context) == 0 - except: - err = sys.exc_info()[1] - self.logger.error("Failed to restore SELinux " - "context for %s: %s" % - (path, err)) - rv = False - return rv - - def _secontext_matches(self, entry): - """ determine if the SELinux context of the file on disk matches - the desired context """ - if not has_selinux: - # no selinux libraries - return True - - path = entry.get("path") - context = entry.get("secontext") - if context is None: - # no context listed - return True - - if context == '__default__': - if selinux.getfilecon(entry.get('name'))[1] == \ - selinux.matchpathcon(entry.get('name'), 0)[1]: - return True - else: - return False - elif selinux.getfilecon(entry.get('name'))[1] == context: - return True - else: - return False - - def _norm_gid(self, gid): - """ This takes a group name or gid and returns the - corresponding gid. """ - try: - return int(gid) - except ValueError: - return int(grp.getgrnam(gid)[2]) - - def _norm_entry_gid(self, entry): - try: - return self._norm_gid(entry.get('group')) - except (OSError, KeyError): - err = sys.exc_info()[1] - self.logger.error('GID normalization failed for %s on %s: %s' % - (entry.get('group'), entry.get('name'), err)) - return False - - def _norm_uid(self, uid): - """ This takes a username or uid and returns the - corresponding uid. """ - try: - return int(uid) - except ValueError: - return int(pwd.getpwnam(uid)[2]) - - def _norm_entry_uid(self, entry): - try: - return self._norm_uid(entry.get("owner")) - except (OSError, KeyError): - err = sys.exc_info()[1] - self.logger.error('UID normalization failed for %s on %s: %s' % - (entry.get('owner'), entry.get('name'), err)) - return False - - def _norm_acl_perms(self, perms): - """ takes a representation of an ACL permset and returns a digit - representing the permissions entailed by it. representations can - either be a single octal digit, a string of up to three 'r', - 'w', 'x', or '-' characters, or a posix1e.Permset object""" - if hasattr(perms, 'test'): - # Permset object - return sum([p for p in acl_map.values() - if perms.test(p)]) - - try: - # single octal digit - return int(perms) - except ValueError: - # couldn't be converted to an int; process as a string - rv = 0 - for char in perms: - if char == '-': - continue - elif char not in acl_map: - self.logger.error("Unknown permissions character in ACL: %s" - % char) - return 0 - else: - rv |= acl_map[char] - return rv - - def _acl2string(self, aclkey, perms): - atype, scope, qualifier = aclkey - acl_str = [] - if atype == 'default': - acl_str.append(atype) - if scope == posix1e.ACL_USER: - acl_str.append("user") - elif scope == posix1e.ACL_GROUP: - acl_str.append("group") - acl_str.append(qualifier) - acl_str.append(self._acl_perm2string(perms)) - return ":".join(acl_str) - - def _acl_perm2string(self, perm): - rv = [] - for char in 'rwx': - if acl_map[char] & perm: - rv.append(char) - else: - rv.append('-') - return ''.join(rv) - - def _is_string(self, strng, encoding): - """ Returns true if the string contains no ASCII control - characters and can be decoded from the specified encoding. """ - for char in strng: - if ord(char) < 9 or ord(char) > 13 and ord(char) < 32: - return False - try: - strng.decode(encoding) - return True - except: - return False - - def Verifydevice(self, entry, _): - """Verify device entry.""" - if entry.get('dev_type') in ['block', 'char']: - # check if major/minor are properly specified - if (entry.get('major') == None or - entry.get('minor') == None): - self.logger.error('Entry %s not completely specified. ' - 'Try running bcfg2-lint.' % - (entry.get('name'))) - return False - - try: - ondisk = os.stat(path) - except OSError: - entry.set('current_exists', 'false') - self.logger.debug("%s %s does not exist" % - (entry.tag, path)) - return False - - rv = self._verify_metadata(entry) - - # attempt to verify device properties as specified in config - dev_type = entry.get('dev_type') - if dev_type in ['block', 'char']: - major = int(entry.get('major')) - minor = int(entry.get('minor')) - if major != os.major(ondisk.st_rdev): - entry.set('current_mtime', mtime) - msg = ("Major number for device %s is incorrect. " - "Current major is %s but should be %s" % - (path, os.major(ondisk.st_rdev), major)) - self.logger.debug(msg) - entry.set('qtext', entry.get('qtext') + "\n" + msg) - rv = False - - if minor != os.minor(ondisk.st_rdev): - entry.set('current_mtime', mtime) - msg = ("Minor number for device %s is incorrect. " - "Current minor is %s but should be %s" % - (path, os.minor(ondisk.st_rdev), minor)) - self.logger.debug(msg) - entry.set('qtext', entry.get('qtext') + "\n" + msg) - rv = False - - return rv - - def Installdevice(self, entry): - """Install device entries.""" - try: - # check for existing paths and remove them - os.lstat(entry.get('name')) - try: - os.unlink(entry.get('name')) - exists = False - except OSError: - self.logger.info('Failed to unlink %s' % - entry.get('name')) - return False - except OSError: - exists = False - - if not exists: - try: - dev_type = entry.get('dev_type') - mode = device_map[dev_type] | int(entry.get('mode', '0600'), 8) - if dev_type in ['block', 'char']: - # check if major/minor are properly specified - if (entry.get('major') == None or - entry.get('minor') == None): - self.logger.error('Entry %s not completely specified. ' - 'Try running bcfg2-lint.' % - entry.get('name')) - return False - major = int(entry.get('major')) - minor = int(entry.get('minor')) - device = os.makedev(major, minor) - os.mknod(entry.get('name'), mode, device) - else: - os.mknod(entry.get('name'), mode) - return self._set_perms(entry) - except KeyError: - self.logger.error('Failed to install %s' % entry.get('name')) - except OSError: - self.logger.error('Failed to install %s' % entry.get('name')) - return False - - def Verifydirectory(self, entry, modlist): - """Verify Path type='directory' entry.""" - pruneTrue = True - ex_ents = [] - if (entry.get('prune', 'false') == 'true' - and (entry.tag == 'Path' and entry.get('type') == 'directory')): - # check for any extra entries when prune='true' attribute is set - try: - entries = ['/'.join([entry.get('name'), ent]) - for ent in os.listdir(entry.get('name'))] - ex_ents = [e for e in entries if e not in modlist] - if ex_ents: - pruneTrue = False - self.logger.info("POSIX: Directory %s contains " - "extra entries:" % entry.get('name')) - self.logger.info(ex_ents) - nqtext = entry.get('qtext', '') + '\n' - nqtext += "Directory %s contains extra entries: " % \ - entry.get('name') - nqtext += ":".join(ex_ents) - entry.set('qtext', nqtext) - [entry.append(XML.Element('Prune', path=x)) - for x in ex_ents] - except OSError: - ex_ents = [] - pruneTrue = True - - return pruneTrue and self._verify_metadata(entry) - - def Installdirectory(self, entry): - """Install Path type='directory' entry.""" - self.logger.info("Installing directory %s" % entry.get('name')) - try: - fmode = os.lstat(entry.get('name')) - except OSError: - # stat failed - exists = False - - if not stat.S_ISDIR(fmode[stat.ST_MODE]): - self.logger.debug("Found a non-directory entry at %s" % - entry.get('name')) - try: - os.unlink(entry.get('name')) - exists = False - except OSError: - self.logger.info("Failed to unlink %s" % entry.get('name')) - return False - else: - self.logger.debug("Found a pre-existing directory at %s" % - entry.get('name')) - exists = True - - if not exists: - parent = "/".join(entry.get('name').split('/')[:-1]) - if parent: - try: - os.stat(parent) - except: - self.logger.debug('Creating parent path for directory %s' % - entry.get('name')) - for idx in range(len(parent.split('/')[:-1])): - current = '/' + '/'.join(parent.split('/')[1:2+idx]) - try: - sloc = os.stat(current) - except OSError: - try: - os.mkdir(current) - continue - except OSError: - return False - if not stat.S_ISDIR(sloc[stat.ST_MODE]): - try: - os.unlink(current) - os.mkdir(current) - except OSError: - return False - - try: - os.mkdir(entry.get('name')) - except OSError: - self.logger.error('Failed to create directory %s' % - entry.get('name')) - return False - if entry.get('prune', 'false') == 'true' and entry.get("qtext"): - for pent in entry.findall('Prune'): - pname = pent.get('path') - ulfailed = False - if os.path.isdir(pname): - self.logger.info("Not removing extra directory %s, " - "please check and remove manually" % pname) - continue - try: - self.logger.debug("Unlinking file %s" % pname) - os.unlink(pname) - except OSError: - self.logger.error("Failed to unlink path %s" % pname) - ulfailed = True - if ulfailed: - return False - return self.Installpermissions(entry) - - def Verifyfile(self, entry, _): - """Verify Path type='file' entry.""" - # permissions check + content check - permissionStatus = self._verify_metadata(entry) - tbin = False - if entry.text == None and entry.get('empty', 'false') == 'false': - self.logger.error("Cannot verify incomplete Path type='%s' %s" % - (entry.get('type'), entry.get('name'))) - return False - if entry.get('encoding', 'ascii') == 'base64': - tempdata = binascii.a2b_base64(entry.text) - tbin = True - elif entry.get('empty', 'false') == 'true': - tempdata = '' - else: - tempdata = entry.text - if type(tempdata) == unicode: - try: - tempdata = tempdata.encode(self.setup['encoding']) - except UnicodeEncodeError: - e = sys.exc_info()[1] - self.logger.error("Error encoding file %s:\n %s" % - (entry.get('name'), e)) - - different = False - content = None - if not os.path.exists(entry.get("name")): - # first, see if the target file exists at all; if not, - # they're clearly different - different = True - content = "" - else: - # next, see if the size of the target file is different - # from the size of the desired content - try: - estat = os.stat(entry.get('name')) - except OSError: - err = sys.exc_info()[1] - self.logger.error("Failed to stat %s: %s" % - (err.filename, err)) - return False - if len(tempdata) != estat[stat.ST_SIZE]: - different = True - else: - # finally, read in the target file and compare them - # directly. comparison could be done with a checksum, - # which might be faster for big binary files, but - # slower for everything else - try: - content = open(entry.get('name')).read() - except IOError: - err = sys.exc_info()[1] - self.logger.error("Failed to read %s: %s" % - (err.filename, err)) - return False - different = content != tempdata - - if different: - if self.setup['interactive']: - prompt = [entry.get('qtext', '')] - if not tbin and content is None: - # it's possible that we figured out the files are - # different without reading in the local file. if - # the supplied version of the file is not binary, - # we now have to read in the local file to figure - # out if _it_ is binary, and either include that - # fact or the diff in our prompts for -I - try: - content = open(entry.get('name')).read() - except IOError: - err = sys.exc_info()[1] - self.logger.error("Failed to read %s: %s" % - (err.filename, err)) - return False - if tbin or not self._is_string(content, self.setup['encoding']): - # don't compute diffs if the file is binary - prompt.append('Binary file, no printable diff') - else: - diff = self._diff(content, tempdata, - difflib.unified_diff, - filename=entry.get("name")) - if diff: - udiff = '\n'.join(diff) - try: - prompt.append(udiff.decode(self.setup['encoding'])) - except UnicodeDecodeError: - prompt.append("Binary file, no printable diff") - else: - prompt.append("Diff took too long to compute, no " - "printable diff") - entry.set("qtext", "\n".join(prompt)) - - if entry.get('sensitive', 'false').lower() != 'true': - if content is None: - # it's possible that we figured out the files are - # different without reading in the local file. we - # now have to read in the local file to figure out - # if _it_ is binary, and either include the whole - # file or the diff for reports - try: - content = open(entry.get('name')).read() - except IOError: - err = sys.exc_info()[1] - self.logger.error("Failed to read %s: %s" % - (err.filename, err)) - return False - - if tbin or not self._is_string(content, self.setup['encoding']): - # don't compute diffs if the file is binary - entry.set('current_bfile', binascii.b2a_base64(content)) - else: - diff = self._diff(content, tempdata, difflib.ndiff, - filename=entry.get("name")) - if diff: - entry.set("current_bdiff", - binascii.b2a_base64("\n".join(diff))) - elif not tbin and self._is_string(content, - self.setup['encoding']): - entry.set('current_bfile', binascii.b2a_base64(content)) - - return permissionStatus and not different - - def Installfile(self, entry): - """Install Path type='file' entry.""" - self.logger.info("Installing file %s" % (entry.get('name'))) - - parent = "/".join(entry.get('name').split('/')[:-1]) - if parent: - try: - os.stat(parent) - except: - self.logger.debug('Creating parent path for config file %s' % - entry.get('name')) - current = '/' - for next in parent.split('/')[1:]: - current += next + '/' - try: - sloc = os.stat(current) - try: - if not stat.S_ISDIR(sloc[stat.ST_MODE]): - self.logger.debug('%s is not a directory; recreating' - % current) - os.unlink(current) - os.mkdir(current) - except OSError: - return False - except OSError: - try: - self.logger.debug("Creating non-existent path %s" % - current) - os.mkdir(current) - except OSError: - return False - - # If we get here, then the parent directory should exist - if (entry.get("paranoid", 'false').lower() == 'true' and - self.setup.get("paranoid", False) and - entry.get('current_exists', 'true') != 'false'): - bkupnam = entry.get('name').replace('/', '_') - # current list of backups for this file - try: - bkuplist = [f for f in os.listdir(self.ppath) if - f.startswith(bkupnam)] - except OSError: - e = sys.exc_info()[1] - self.logger.error("Failed to create backup list in %s: %s" % - (self.ppath, e.strerror)) - return False - bkuplist.sort() - while len(bkuplist) >= int(self.max_copies): - # remove the oldest backup available - oldest = bkuplist.pop(0) - self.logger.info("Removing %s" % oldest) - try: - os.remove("%s/%s" % (self.ppath, oldest)) - except: - self.logger.error("Failed to remove %s/%s" % - (self.ppath, oldest)) - return False - try: - # backup existing file - shutil.copy(entry.get('name'), - "%s/%s_%s" % (self.ppath, bkupnam, - datetime.isoformat(datetime.now()))) - self.logger.info("Backup of %s saved to %s" % - (entry.get('name'), self.ppath)) - except IOError: - e = sys.exc_info()[1] - self.logger.error("Failed to create backup file for %s" % - entry.get('name')) - self.logger.error(e) - return False - try: - newfile = open("%s.new"%(entry.get('name')), 'w') - if entry.get('encoding', 'ascii') == 'base64': - filedata = binascii.a2b_base64(entry.text) - elif entry.get('empty', 'false') == 'true': - filedata = '' - else: - if type(entry.text) == unicode: - filedata = entry.text.encode(self.setup['encoding']) - else: - filedata = entry.text - newfile.write(filedata) - newfile.close() - - rv = self._set_perms(entry, newfile.name) - os.rename(newfile.name, entry.get('name')) - if entry.get('mtime'): - try: - os.utime(entry.get('name'), (int(entry.get('mtime')), - int(entry.get('mtime')))) - except: - self.logger.error("Failed to set mtime of %s" % path) - rv = False - return rv - except (OSError, IOError): - err = sys.exc_info()[1] - self.logger.error("Failed to open %s for writing: %s" % - (entry.get('name'), err)) - return False - - def Verifyhardlink(self, entry, _): - """Verify HardLink entry.""" - rv = True - - try: - if not os.path.samefile(entry.get('name'), entry.get('to')): - msg = "Hardlink %s is incorrect." % entry.get('name') - self.logger.debug(msg) - entry.set('qtext', "\n".join([entry.get('qtext', ''), msg])) - rv = False - except OSError: - entry.set('current_exists', 'false') - return False - - rv &= self._verify_secontext(entry) - return rv - - def Installhardlink(self, entry): - """Install HardLink entry.""" - self.logger.info("Installing Hardlink %s" % entry.get('name')) - if os.path.lexists(entry.get('name')): - try: - fmode = os.lstat(entry.get('name'))[stat.ST_MODE] - if stat.S_ISREG(fmode) or stat.S_ISLNK(fmode): - self.logger.debug("Non-directory entry already exists at " - "%s. Unlinking entry." % - entry.get('name')) - os.unlink(entry.get('name')) - elif stat.S_ISDIR(fmode): - self.logger.debug("Directory already exists at %s" % - entry.get('name')) - self.cmd.run("mv %s/ %s.bak" % (entry.get('name'), - entry.get('name'))) - else: - os.unlink(entry.get('name')) - except OSError: - self.logger.info("Hardlink %s cleanup failed" % \ - (entry.get('name'))) - try: - os.link(entry.get('to'), entry.get('name')) - return self._set_perms(entry) - except OSError: - return False - - def Verifynonexistent(self, entry, _): - """Verify nonexistent entry.""" - # return true if path does _not_ exist - return not os.path.lexists(entry.get('name')) - - def Installnonexistent(self, entry): - '''Remove nonexistent entries''' - ename = entry.get('name') - if entry.get('recursive').lower() == 'true': - # ensure that configuration spec is consistent first - if [e for e in self.buildModlist() \ - if e.startswith(ename) and e != ename]: - self.logger.error('Not installing %s. One or more files ' - 'in this directory are specified in ' - 'your configuration.' % ename) - return False - try: - shutil.rmtree(ename) - except OSError: - e = sys.exc_info()[1] - self.logger.error('Failed to remove %s: %s' % (ename, - e.strerror)) - else: - if os.path.islink(ename): - os.remove(ename) - return True - elif os.path.isdir(ename): - try: - os.rmdir(ename) - return True - except OSError: - e = sys.exc_info()[1] - self.logger.error('Failed to remove %s: %s' % (ename, - e.strerror)) - return False - try: - os.remove(ename) - return True - except OSError: - e = sys.exc_info()[1] - self.logger.error('Failed to remove %s: %s' % (ename, - e.strerror)) - return False - - def Verifypermissions(self, entry, _): - """Verify Path type='permissions' entry""" - rv = self._verify_metadata(entry) - - if entry.get('recursive', 'false').lower() == 'true': - # verify ownership information recursively - for root, dirs, files in os.walk(entry.get('name')): - for p in dirs + files: - rv &= self._verify_metadata(entry, - path=os.path.join(root, p)) - return rv - - def Installpermissions(self, entry): - """Install POSIX permissions""" - plist = [entry.get('name')] - if entry.get('recursive', 'false').lower() == 'true': - # verify ownership information recursively - for root, dirs, files in os.walk(entry.get('name')): - for p in dirs + files: - if not self._verify_metadata(entry, - path=os.path.join(root, p), - checkonly=True): - plist.append(path) - rv = True - for path in plist: - rv &= self._set_perms(entry, path) - return rv - - def Verifysymlink(self, entry, _): - """Verify Path type='symlink' entry.""" - if entry.get('to') == None: - self.logger.error('Entry %s not completely specified. ' - 'Try running bcfg2-lint.' % - (entry.get('name'))) - return False - - rv = True - - try: - sloc = os.readlink(entry.get('name')) - if sloc != entry.get('to'): - entry.set('current_to', sloc) - msg = ("Symlink %s points to %s, should be %s" % - (entry.get('name'), sloc, entry.get('to'))) - self.logger.debug(msg) - entry.set('qtext', "\n".join([entry.get('qtext', ''), msg])) - rv = False - except OSError: - entry.set('current_exists', 'false') - return False - - rv &= self._verify_secontext(entry) - return rv - - def Installsymlink(self, entry): - """Install Path type='symlink' entry.""" - if entry.get('to') == None: - self.logger.error('Entry %s not completely specified. ' - 'Try running bcfg2-lint.' % entry.get('name')) - return False - self.logger.info("Installing symlink %s" % (entry.get('name'))) - if os.path.lexists(entry.get('name')): - try: - fmode = os.lstat(entry.get('name'))[stat.ST_MODE] - if stat.S_ISREG(fmode) or stat.S_ISLNK(fmode): - self.logger.debug("Non-directory entry already exists at " - "%s. Unlinking entry." % - entry.get('name')) - os.unlink(entry.get('name')) - elif stat.S_ISDIR(fmode): - self.logger.debug("Directory already exists at %s" % - entry.get('name')) - self.cmd.run("mv %s/ %s.bak" % (entry.get('name'), - entry.get('name'))) - else: - os.unlink(entry.get('name')) - except OSError: - self.logger.info("Symlink %s cleanup failed" % - (entry.get('name'))) - try: - os.symlink(entry.get('to'), entry.get('name')) - return self._set_setcontext(entry) - except OSError: - return False - - def InstallPath(self, entry): - """Dispatch install to the proper method according to type""" - ret = getattr(self, 'Install%s' % entry.get('type')) - return ret(entry) - - def VerifyPath(self, entry, _): - """Dispatch verify to the proper method according to type""" - ret = getattr(self, 'Verify%s' % entry.get('type'))(entry, _) - if entry.get('qtext') and self.setup['interactive']: - entry.set('qtext', - '%s\nInstall %s %s: (y/N) ' % - (entry.get('qtext'), - entry.get('type'), entry.get('name'))) - return ret - - def _verify_metadata(self, entry, path=None, checkonly=False): - """ generic method to verify perms, owner, group, secontext, - and mtime """ - - # allow setting an alternate path for recursive permissions checking - if path is None: - path = entry.get('name') - - while len(entry.get('perms', '')) < 4: - entry.set('perms', '0' + entry.get('perms', '')) - - try: - ondisk = os.stat(path) - except OSError: - entry.set('current_exists', 'false') - self.logger.debug("POSIX: %s %s does not exist" % - (entry.tag, path)) - return False - - try: - owner = str(ondisk[stat.ST_UID]) - group = str(ondisk[stat.ST_GID]) - except (OSError, KeyError): - self.logger.error('POSIX: User/Group resolution failed for path %s' - % path) - owner = 'root' - group = '0' - - perms = oct(ondisk[stat.ST_MODE])[-4:] - if entry.get('mtime', '-1') != '-1': - mtime = str(ondisk[stat.ST_MTIME]) - else: - mtime = '-1' - - configOwner = str(self._norm_entry_uid(entry)) - configGroup = str(self._norm_entry_gid(entry)) - configPerms = int(entry.get('perms'), 8) - if entry.get('dev_type'): - configPerms |= device_map[entry.get('dev_type')] - if has_selinux: - if entry.get("secontext") == "__default__": - try: - configContext = selinux.matchpathcon(path, 0)[1] - except OSError: - self.logger.warning("Failed to get default SELinux context " - "for %s; missing fcontext rule?" % - path) - return False - else: - configContext = entry.get("secontext") - - errors = [] - if owner != configOwner: - if checkonly: - return False - entry.set('current_owner', owner) - errors.append("POSIX: Owner for path %s is incorrect. " - "Current owner is %s but should be %s" % - (path, ondisk.st_uid, entry.get('owner'))) - - if group != configGroup: - if checkonly: - return False - entry.set('current_group', group) - errors.append("POSIX: Group for path %s is incorrect. " - "Current group is %s but should be %s" % - (path, ondisk.st_gid, entry.get('group'))) - - if oct(int(perms, 8)) != oct(configPerms): - if checkonly: - return False - entry.set('current_perms', perms) - errors.append("POSIX: Permissions for path %s are incorrect. " - "Current permissions are %s but should be %s" % - (path, perms, entry.get('perms'))) - - if entry.get('mtime') and mtime != entry.get('mtime', '-1'): - if checkonly: - return False - entry.set('current_mtime', mtime) - errors.append("POSIX: mtime for path %s is incorrect. " - "Current mtime is %s but should be %s" % - (path, mtime, entry.get('mtime'))) - - seVerifies = self._verify_secontext(entry) - aclVerifies = self._verify_acls(entry) - - if errors: - for error in errors: - self.logger.debug(error) - entry.set('qtext', "\n".join([entry.get('qtext', '')] + errors)) - return False - else: - return seVerifies and aclVerifies - - def _list_entry_acls(self, entry): - wanted = dict() - for acl in entry.findall("ACL"): - if acl.get("scope") == "user": - scope = posix1e.ACL_USER - elif acl.get("scope") == "group": - scope = posix1e.ACL_GROUP - else: - self.logger.error("Unknown ACL scope %s" % acl.get("scope")) - continue - wanted[(acl.get("type"), scope, acl.get(acl.get("scope")))] = \ - self._norm_acl_perms(acl.get('perms')) - return wanted - - def _list_file_acls(self, entry): - def _process_acl(acl, atype): - try: - if acl.tag_type == posix1e.ACL_USER: - qual = pwd.getpwuid(acl.qualifier)[0] - elif acl.tag_type == posix1e.ACL_GROUP: - qual = grp.getgrgid(acl.qualifier)[0] - else: - return - except (OSError, KeyError): - err = sys.exc_info()[1] - self.logger.error("Lookup of %s %s failed: %s" % - (scope, acl.qualifier, err)) - qual = acl.qualifier - existing[(atype, acl.tag_type, qual)] = \ - self._norm_acl_perms(acl.permset) - - existing = dict() - for acl in posix1e.ACL(file=entry.get("name")): - _process_acl(acl, "access") - if os.path.isdir(entry.get("name")): - for acl in posix1e.ACL(filedef=entry.get("name")): - _process_acl(acl, "default") - return existing - - def _verify_acls(self, entry): - if not has_acls: - if entry.findall("ACL"): - self.logger.debug("ACLs listed for %s but no pylibacl library " - "installed" % entry.get('name')) - return True - - # create lists of normalized representations of the ACLs we want - # and the ACLs we have. this will make them easier to compare - # than trying to mine that data out of the ACL objects and XML - # objects and compare it at the same time. - wanted = self._list_entry_acls(entry) - existing = self._list_file_acls(entry) - - missing = [] - extra = [] - wrong = [] - for aclkey, perms in wanted.items(): - acl_str = self._acl2string(aclkey, perms) - if aclkey not in existing: - missing.append(acl_str) - elif existing[aclkey] != perms: - wrong.append((acl_str, - self._acl2string(aclkey, existing[aclkey]))) - - for aclkey, perms in existing.items(): - if aclkey not in wanted: - extra.append(self._acl2string(aclkey, perms)) - - msg = [] - if missing: - msg.append("%s ACLs are missing: %s" % (len(missing), - ", ".join(missing))) - if wrong: - msg.append("%s ACLs are wrong: %s" % - (len(wrong), - "; ".join(["%s should be %s" % (e, w) - for w, e in wrong]))) - if extra: - msg.append("%s extra ACLs: %s" % (len(extra), ", ".join(extra))) - - if msg: - msg.insert(0, - "POSIX ACLs for path %s are incorrect." % - entry.get("name")) - self.logger.debug(msg[0]) - for line in msg[1:]: - self.logger.debug(" " + line) - entry.set('qtext', "\n".join([entry.get("qtext", '')] + msg)) - return False - return True - - def _verify_secontext(self, entry): - if not self._secontext_matches(entry): - path = entry.get("name") - if entry.get("secontext") == "__default__": - configContext = selinux.matchpathcon(path, 0)[1] - else: - configContext = entry.get("secontext") - pcontext = selinux.getfilecon(path)[1] - entry.set('current_secontext', pcontext) - msg = ("SELinux context for path %s is incorrect. " - "Current context is %s but should be %s" % - (path, pcontext, configContext)) - self.logger.debug("POSIX: " + msg) - entry.set('qtext', "\n".join([entry.get("qtext", ''), msg])) - return False - return True - - def _diff(self, content1, content2, difffunc, filename=None): - rv = [] - start = time.time() - longtime = False - for diffline in difffunc(content1.split('\n'), - content2.split('\n')): - now = time.time() - rv.append(diffline) - if now - start > 5 and not longtime: - if filename: - self.logger.info("Diff of %s taking a long time" % - filename) - else: - self.logger.info("Diff taking a long time") - longtime = True - elif now - start > 30: - if filename: - self.logger.error("Diff of %s took too long; giving up" % - filename) - else: - self.logger.error("Diff took too long; giving up") - return False - return rv diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/Device.py b/src/lib/Bcfg2/Client/Tools/POSIX/Device.py new file mode 100644 index 000000000..b8fb0f4d0 --- /dev/null +++ b/src/lib/Bcfg2/Client/Tools/POSIX/Device.py @@ -0,0 +1,62 @@ +import os +import sys +from base import POSIXTool, device_map + +class POSIXDevice(POSIXTool): + __req__ = ['name', 'dev_type', 'perms', 'owner', 'group'] + + def fully_specified(self, entry): + if entry.get('dev_type') in ['block', 'char']: + # check if major/minor are properly specified + if (entry.get('major') == None or + entry.get('minor') == None): + return False + return True + + def verify(self, entry, modlist): + """Verify device entry.""" + ondisk = self._exists(entry) + if not ondisk: + return False + + # attempt to verify device properties as specified in config + rv = True + dev_type = entry.get('dev_type') + if dev_type in ['block', 'char']: + major = int(entry.get('major')) + minor = int(entry.get('minor')) + if major != os.major(ondisk.st_rdev): + msg = ("Major number for device %s is incorrect. " + "Current major is %s but should be %s" % + (entry.get("name"), os.major(ondisk.st_rdev), major)) + self.logger.debug('POSIX: ' + msg) + entry.set('qtext', entry.get('qtext', '') + "\n" + msg) + rv = False + + if minor != os.minor(ondisk.st_rdev): + msg = ("Minor number for device %s is incorrect. " + "Current minor is %s but should be %s" % + (entry.get("name"), os.minor(ondisk.st_rdev), minor)) + self.logger.debug('POSIX: ' + msg) + entry.set('qtext', entry.get('qtext', '') + "\n" + msg) + rv = False + return POSIXTool.verify(self, entry, modlist) and rv + + def install(self, entry): + if not self._exists(entry, remove=True): + try: + dev_type = entry.get('dev_type') + mode = device_map[dev_type] | int(entry.get('perms'), 8) + if dev_type in ['block', 'char']: + major = int(entry.get('major')) + minor = int(entry.get('minor')) + device = os.makedev(major, minor) + os.mknod(entry.get('name'), mode, device) + else: + os.mknod(entry.get('name'), mode) + except (KeyError, OSError, ValueError): + err = sys.exc_info()[1] + self.logger.error('POSIX: Failed to install %s: %s' % + (entry.get('name'), err)) + return False + return POSIXTool.install(self, entry) diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/Directory.py b/src/lib/Bcfg2/Client/Tools/POSIX/Directory.py new file mode 100644 index 000000000..4b0ad93ef --- /dev/null +++ b/src/lib/Bcfg2/Client/Tools/POSIX/Directory.py @@ -0,0 +1,86 @@ +import os +import sys +import stat +import shutil +import Bcfg2.Client.XML +from base import POSIXTool + +class POSIXDirectory(POSIXTool): + __req__ = ['name', 'perms', 'owner', 'group'] + + def verify(self, entry, modlist): + ondisk = self._exists(entry) + if not ondisk: + return False + + if not stat.S_ISDIR(ondisk[stat.ST_MODE]): + self.logger.info("POSIX: %s is not a directory" % entry.get('name')) + return False + + pruneTrue = True + if entry.get('prune', 'false').lower() == 'true': + # check for any extra entries when prune='true' attribute is set + try: + extras = [os.path.join(entry.get('name'), ent) + for ent in os.listdir(entry.get('name')) + if os.path.join(entry.get('name'), + ent) not in modlist] + if extras: + pruneTrue = False + msg = "Directory %s contains extra entries: %s" % \ + (entry.get('name'), "; ".join(extras)) + self.logger.info("POSIX: " + msg) + entry.set('qtext', entry.get('qtext', '') + '\n' + msg) + for extra in extras: + Bcfg2.Client.XML.SubElement(entry, 'Prune', path=extra) + except OSError: + pruneTrue = True + + return POSIXTool.verify(self, entry, modlist) and pruneTrue + + def install(self, entry): + """Install device entries.""" + fmode = self._exists(entry) + + if fmode and not stat.S_ISDIR(fmode[stat.ST_MODE]): + self.logger.info("POSIX: Found a non-directory entry at %s, " + "removing" % entry.get('name')) + try: + os.unlink(entry.get('name')) + fmode = False + except OSError: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to unlink %s: %s" % + (entry.get('name'), err)) + return False + elif fmode: + self.logger.debug("POSIX: Found a pre-existing directory at %s" % + entry.get('name')) + + rv = True + if not fmode: + rv &= self._makedirs(entry) + + if entry.get('prune', 'false') == 'true': + ulfailed = False + for pent in entry.findall('Prune'): + pname = pent.get('path') + ulfailed = False + if os.path.isdir(pname): + rm = shutil.rmtree + else: + rm = os.unlink + try: + self.logger.debug("POSIX: Removing %s" % pname) + rm(pname) + except OSError: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to unlink %s: %s" % + (pname, err)) + ulfailed = True + if ulfailed: + # even if prune failed, we still want to install the + # entry to make sure that we get permissions and + # whatnot set + rv = False + return POSIXTool.install(self, entry) and rv diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/File.py b/src/lib/Bcfg2/Client/Tools/POSIX/File.py new file mode 100644 index 000000000..73ed2d8bf --- /dev/null +++ b/src/lib/Bcfg2/Client/Tools/POSIX/File.py @@ -0,0 +1,219 @@ +import os +import sys +import stat +import time +import difflib +import binascii +import tempfile +from base import POSIXTool + +# py3k compatibility +if sys.hexversion >= 0x03000000: + unicode = str + +class POSIXFile(POSIXTool): + __req__ = ['name', 'perms', 'owner', 'group'] + + def fully_specified(self, entry): + return entry.text is not None or entry.get('empty', 'false') == 'true' + + def _is_string(self, strng, encoding): + """ Returns true if the string contains no ASCII control + characters and can be decoded from the specified encoding. """ + for char in strng: + if ord(char) < 9 or ord(char) > 13 and ord(char) < 32: + return False + try: + strng.decode(encoding) + return True + except: + return False + + def _get_data(self, entry): + is_binary = False + if entry.get('encoding', 'ascii') == 'base64': + tempdata = binascii.a2b_base64(entry.text) + is_binary = True + elif entry.get('empty', 'false') == 'true': + tempdata = '' + else: + tempdata = entry.text + if isinstance(tempdata, unicode): + try: + tempdata = tempdata.encode(self.setup['encoding']) + except UnicodeEncodeError: + err = sys.exc_info()[1] + self.logger.error("POSIX: Error encoding file %s: %s" % + (entry.get('name'), err)) + return (tempdata, is_binary) + + def verify(self, entry, modlist): + ondisk = self._exists(entry) + tempdata, is_binary = self._get_data(entry) + + different = False + content = None + if not ondisk: + # first, see if the target file exists at all; if not, + # they're clearly different + different = True + content = "" + elif len(tempdata) != ondisk[stat.ST_SIZE]: + # next, see if the size of the target file is different + # from the size of the desired content + different = True + else: + # finally, read in the target file and compare them + # directly. comparison could be done with a checksum, + # which might be faster for big binary files, but slower + # for everything else + try: + content = open(entry.get('name')).read() + except IOError: + self.logger.error("POSIX: Failed to read %s: %s" % + (entry.get("name"), sys.exc_info()[1])) + return False + different = content != tempdata + + if different: + self.logger.debug("POSIX: %s has incorrect contents" % + entry.get("name")) + self._get_diffs( + entry, interactive=self.setup['interactive'], + sensitive=entry.get('sensitive', 'false').lower() == 'true', + is_binary=is_binary, content=content) + return POSIXTool.verify(self, entry, modlist) and not different + + def _write_tmpfile(self, entry): + filedata, _ = self._get_data(entry) + # get a temp file to write to that is in the same directory as + # the existing file in order to preserve any permissions + # protections on that directory, and also to avoid issues with + # /tmp set nosetuid while creating files that are supposed to + # be setuid + try: + (newfd, newfile) = \ + tempfile.mkstemp(prefix=os.path.basename(entry.get("name")), + dir=os.path.dirname(entry.get("name"))) + except OSError: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to create temp file in %s: %s" % + (os.path.dirname(entry.get('name')), err)) + return False + try: + os.fdopen(newfd, 'w').write(filedata) + except (OSError, IOError): + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to open temp file %s for writing " + "%s: %s" % + (newfile, entry.get("name"), err)) + return False + return newfile + + def _rename_tmpfile(self, newfile, entry): + try: + os.rename(newfile, entry.get('name')) + return True + except OSError: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to rename temp file %s to %s: %s" % + (newfile, entry.get('name'), err)) + try: + os.unlink(newfile) + except: + err = sys.exc_info()[1] + self.logger.error("POSIX: Could not remove temp file %s: %s" % + (newfile, err)) + return False + + def install(self, entry): + """Install device entries.""" + if not os.path.exists(os.path.dirname(entry.get('name'))): + if not self._makedirs(entry, + path=os.path.dirname(entry.get('name'))): + return False + newfile = self._write_tmpfile(entry) + if not newfile: + return False + rv = self._set_perms(entry, path=newfile) + if not self._rename_tmpfile(newfile, entry): + return False + + return POSIXTool.install(self, entry) and rv + + def _get_diffs(entry, interactive=False, sensitive=False, is_binary=False, + content=None): + if not interactive and sensitive: + return + + prompt = [entry.get('qtext', '')] + attrs = dict() + if not is_binary and content is None: + # it's possible that we figured out the files are + # different without reading in the local file. if the + # supplied version of the file is not binary, we now have + # to read in the local file to figure out if _it_ is + # binary, and either include that fact or the diff in our + # prompts for -I and the reports + try: + content = open(entry.get('name')).read() + except IOError: + self.logger.error("POSIX: Failed to read %s: %s" % + (entry.get("name"), sys.exc_info()[1])) + return False + is_binary &= self._is_string(content, self.setup['encoding']) + if is_binary: + # don't compute diffs if the file is binary + prompt.append('Binary file, no printable diff') + attrs['current_bfile'] = binascii.b2a_base64(content) + else: + if interactive: + diff = self._diff(content, tempdata, + difflib.unified_diff, + filename=entry.get("name")) + if diff: + udiff = '\n'.join(diff) + try: + prompt.append(udiff.decode(self.setup['encoding'])) + except UnicodeEncodeError: + prompt.append("Could not encode diff") + else: + prompt.append("Diff took too long to compute, no " + "printable diff") + if not sensitive: + diff = self._diff(content, tempdata, difflib.ndiff, + filename=entry.get("name")) + if diff: + ndiff = binascii.b2a_base64("\n".join(diff)) + attrs["current_bdiff"] = ndiff + else: + attrs['current_bfile'] = binascii.b2a_base64(content) + if interactive: + entry.set("qtext", "\n".join(prompt)) + if not sensitive: + for attr, val in attrs: + entry.set(attr, val) + + def _diff(self, content1, content2, difffunc, filename=None): + rv = [] + start = time.time() + longtime = False + for diffline in difffunc(content1.split('\n'), + content2.split('\n')): + now = time.time() + rv.append(diffline) + if now - start > 5 and not longtime: + if filename: + self.logger.info("POSIX: Diff of %s taking a long time" % + filename) + else: + self.logger.info("POSIX: Diff taking a long time") + longtime = True + elif now - start > 30: + if filename: + self.logger.error("POSIX: Diff of %s took too long; giving " + "up" % filename) + else: + self.logger.error("POSIX: Diff took too long; giving up") + return False + return rv diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/Hardlink.py b/src/lib/Bcfg2/Client/Tools/POSIX/Hardlink.py new file mode 100644 index 000000000..569ca3445 --- /dev/null +++ b/src/lib/Bcfg2/Client/Tools/POSIX/Hardlink.py @@ -0,0 +1,39 @@ +import os +import sys +from base import POSIXTool + +class POSIXHardlink(POSIXTool): + __req__ = ['name', 'to'] + + def verify(self, entry, modlist): + rv = True + + try: + if not os.path.samefile(entry.get('name'), entry.get('to')): + msg = "Hardlink %s is incorrect" % entry.get('name') + self.logger.debug("POSIX: " + msg) + entry.set('qtext', "\n".join([entry.get('qtext', ''), msg])) + rv = False + except OSError: + self.logger.debug("POSIX: %s %s does not exist" % + (entry.tag, entry.get("name"))) + entry.set('current_exists', 'false') + return False + + return POSIXTool.verify(self, entry, modlist) and rv + + def install(self, entry): + ondisk = self._exists(entry, remove=True) + if ondisk: + self.logger.info("POSIX: Hardlink %s cleanup failed" % + entry.get('name')) + try: + os.link(entry.get('to'), entry.get('name')) + rv = True + except OSError: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to create hardlink %s to %s: %s" % + (entry.get('name'), entry.get('to'), err)) + rv = False + return POSIXTool.install(self, entry) and rv + diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/Nonexistent.py b/src/lib/Bcfg2/Client/Tools/POSIX/Nonexistent.py new file mode 100644 index 000000000..64a36cce4 --- /dev/null +++ b/src/lib/Bcfg2/Client/Tools/POSIX/Nonexistent.py @@ -0,0 +1,41 @@ +import os +import sys +import shutil +from base import POSIXTool + +class POSIXNonexistent(POSIXTool): + __req__ = ['name'] + + def verify(self, entry, _): + if os.path.lexists(entry.get('name')): + self.logger.debug("POSIX: %s exists but should not" % + entry.get("name")) + return False + return True + + def install(self, entry): + ename = entry.get('name') + if entry.get('recursive', '').lower() == 'true': + # ensure that configuration spec is consistent first + for struct in self.config.getchildren(): + for entry in struct.getchildren(): + if (entry.tag == 'Path' and + entry.get('type') != 'nonexistent' and + entry.get('name').startswith(ename)): + self.logger.error('POSIX: Not removing %s. One or ' + 'more files in this directory are ' + 'specified in your configuration.' % + ename) + return False + rm = shutil.rmtree + elif os.path.isdir(ename): + rm = os.rmdir + else: + rm = os.remove + try: + rm(ename) + return True + except OSError: + err = sys.exc_info()[1] + self.logger.error('POSIX: Failed to remove %s: %s' % (ename, err)) + return False diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/Permissions.py b/src/lib/Bcfg2/Client/Tools/POSIX/Permissions.py new file mode 100644 index 000000000..c041b9ade --- /dev/null +++ b/src/lib/Bcfg2/Client/Tools/POSIX/Permissions.py @@ -0,0 +1,7 @@ +import os +import sys +from base import POSIXTool + +class POSIXPermissions(POSIXTool): + __req__ = ['name', 'perms', 'owner', 'group'] + diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/Symlink.py b/src/lib/Bcfg2/Client/Tools/POSIX/Symlink.py new file mode 100644 index 000000000..d5222513e --- /dev/null +++ b/src/lib/Bcfg2/Client/Tools/POSIX/Symlink.py @@ -0,0 +1,42 @@ +import os +import sys +from base import POSIXTool + +class POSIXSymlink(POSIXTool): + __req__ = ['name', 'to'] + + def verify(self, entry, modlist): + rv = True + + try: + sloc = os.readlink(entry.get('name')) + if sloc != entry.get('to'): + entry.set('current_to', sloc) + msg = ("Symlink %s points to %s, should be %s" % + (entry.get('name'), sloc, entry.get('to'))) + self.logger.debug("POSIX: " + msg) + entry.set('qtext', "\n".join([entry.get('qtext', ''), msg])) + rv = False + except OSError: + self.logger.debug("POSIX: %s %s does not exist" % + (entry.tag, entry.get("name"))) + entry.set('current_exists', 'false') + return False + + return POSIXTool.verify(self, entry, modlist) and rv + + def install(self, entry): + ondisk = self._exists(entry, remove=True) + if ondisk: + self.logger.info("POSIX: Symlink %s cleanup failed" % + entry.get('name')) + try: + os.symlink(entry.get('to'), entry.get('name')) + rv = True + except OSError: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to create symlink %s to %s: %s" % + (entry.get('name'), entry.get('to'), err)) + rv = False + return POSIXTool.install(self, entry) and rv + diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/__init__.py b/src/lib/Bcfg2/Client/Tools/POSIX/__init__.py new file mode 100644 index 000000000..7e649a2c1 --- /dev/null +++ b/src/lib/Bcfg2/Client/Tools/POSIX/__init__.py @@ -0,0 +1,147 @@ +"""All POSIX Type client support for Bcfg2.""" + +import os +import re +import sys +import shutil +import pkgutil +from datetime import datetime +import Bcfg2.Client.Tools +from base import POSIXTool + +class POSIX(Bcfg2.Client.Tools.Tool): + """POSIX File support code.""" + name = 'POSIX' + + def __init__(self, logger, setup, config): + Bcfg2.Client.Tools.Tool.__init__(self, logger, setup, config) + self.ppath = setup['ppath'] + self.max_copies = setup['max_copies'] + self._load_handlers() + self.logger.debug("POSIX: Handlers loaded: %s" % + (", ".join(self._handlers.keys()))) + self.__req__ = dict(Path=dict()) + for etype, hdlr in self._handlers.items(): + self.__req__['Path'][etype] = hdlr.__req__ + self.__handles__.append(('Path', etype)) + # Tool.__init__() sets up the list of handled entries, but we + # need to do it again after __handles__ has been populated. we + # can't populate __handles__ when the class is created because + # _load_handlers() _must_ be called at run-time, not at + # compile-time. + for struct in config: + self.handled = [e for e in struct if self.handlesEntry(e)] + + def _load_handlers(self): + # this must be called at run-time, not at compile-time, or we + # get wierd circular import issues. + self._handlers = dict() + if hasattr(pkgutil, 'walk_packages'): + submodules = pkgutil.walk_packages(path=__path__) + else: + # python 2.4 + import glob + submodules = [] + for path in __path__: + for submodule in glob.glob(os.path.join(path, "*.py")): + mod = os.path.splitext(os.path.basename(submodule))[0] + if mod not in ['__init__']: + submodules.append((None, mod, True)) + + for submodule in submodules: + if submodule[1] == 'base': + continue + module = getattr(__import__("%s.%s" % + (__name__, + submodule[1])).Client.Tools.POSIX, + submodule[1]) + hdlr = getattr(module, "POSIX" + submodule[1]) + if POSIXTool in hdlr.__mro__: + # figure out what entry type this handler handles + etype = hdlr.__name__[5:].lower() + self._handlers[etype] = hdlr(self.logger, + self.setup, + self.config) + + def canVerify(self, entry): + if not Bcfg2.Client.Tools.Tool.canVerify(self, entry): + return False + if not self._handlers[entry.get("type")].fully_specified(entry): + self.logger.error('POSIX: Cannot verify incomplete entry %s. ' + 'Try running bcfg2-lint.' % + entry.get('name')) + return False + return True + + def canInstall(self, entry): + """Check if entry is complete for installation.""" + if not Bcfg2.Client.Tools.Tool.canInstall(self, entry): + return False + if not self._handlers[entry.get("type")].fully_specified(entry): + self.logger.error('POSIX: Cannot install incomplete entry %s. ' + 'Try running bcfg2-lint.' % + entry.get('name')) + return False + return True + + def InstallPath(self, entry): + """Dispatch install to the proper method according to type""" + self.logger.debug("POSIX: Installing entry %s:%s:%s" % + (entry.tag, entry.get("type"), entry.get("name"))) + self._paranoid_backup(entry) + return self._handlers[entry.get("type")].install(entry) + + def VerifyPath(self, entry, modlist): + """Dispatch verify to the proper method according to type""" + self.logger.debug("POSIX: Verifying entry %s:%s:%s" % + (entry.tag, entry.get("type"), entry.get("name"))) + ret = self._handlers[entry.get("type")].verify(entry, modlist) + if self.setup['interactive'] and not ret: + entry.set('qtext', + '%s\nInstall %s %s: (y/N) ' % + (entry.get('qtext', ''), + entry.get('type'), entry.get('name'))) + return ret + + def _prune_old_backups(self, entry): + bkupnam = entry.get('name').replace('/', '_') + bkup_re = re.compile(bkupnam + \ + r'_\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{6}$') + # current list of backups for this file + try: + bkuplist = [f for f in os.listdir(self.ppath) if + bkup_re.match(f)] + except OSError: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to create backup list in %s: %s" % + (self.ppath, err)) + return + bkuplist.sort() + while len(bkuplist) >= int(self.max_copies): + # remove the oldest backup available + oldest = bkuplist.pop(0) + self.logger.info("POSIX: Removing old backup %s" % oldest) + try: + os.remove(os.path.join(self.ppath, oldest)) + except OSError: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to remove old backup %s: %s" % + (os.path.join(self.ppath, oldest), err)) + + def _paranoid_backup(self, entry): + if (entry.get("paranoid", 'false').lower() == 'true' and + self.setup.get("paranoid", False) and + entry.get('current_exists', 'true') == 'true' and + not os.path.isdir(entry.get("name"))): + self._prune_old_backups(entry) + bkupnam = "%s_%s" % (entry.get('name').replace('/', '_'), + datetime.isoformat(datetime.now())) + bfile = os.path.join(self.ppath, bkupnam) + try: + shutil.copy(entry.get('name'), bfile) + self.logger.info("POSIX: Backup of %s saved to %s" % + (entry.get('name'), bfile)) + except IOError: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to create backup file for %s: " + "%s" % (entry.get('name'), err)) diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/base.py b/src/lib/Bcfg2/Client/Tools/POSIX/base.py new file mode 100644 index 000000000..1ec1d36d5 --- /dev/null +++ b/src/lib/Bcfg2/Client/Tools/POSIX/base.py @@ -0,0 +1,639 @@ +import os +import sys +import pwd +import grp +import stat +import shutil +import Bcfg2.Client.Tools +import Bcfg2.Client.XML + +try: + import selinux + has_selinux = True +except ImportError: + has_selinux = False + +try: + import posix1e + has_acls = True +except ImportError: + has_acls = False + +# map between dev_type attribute and stat constants +device_map = dict(block=stat.S_IFBLK, + char=stat.S_IFCHR, + fifo=stat.S_IFIFO) + +# map between permissions characters and numeric ACL constants +acl_map = dict(r=posix1e.ACL_READ, + w=posix1e.ACL_WRITE, + x=posix1e.ACL_EXECUTE) + +class POSIXTool(Bcfg2.Client.Tools.Tool): + def fully_specified(self, entry): + # checking is done by __req__ + return True + + def verify(self, entry, modlist): + if not self._verify_metadata(entry): + return False + if entry.get('recursive', 'false').lower() == 'true': + # verify ownership information recursively + for root, dirs, files in os.walk(entry.get('name')): + for p in dirs + files: + if not self._verify_metadata(entry, + path=os.path.join(root, p)): + return False + return True + + def install(self, entry): + plist = [entry.get('name')] + rv = True + rv &= self._set_perms(entry) + if entry.get('recursive', 'false').lower() == 'true': + # set metadata recursively + for root, dirs, files in os.walk(entry.get('name')): + for path in dirs + files: + rv &= self._set_perms(entry, path=os.path.join(root, path)) + return rv + + def _exists(self, entry, remove=False): + try: + # check for existing paths and optionally remove them + ondisk = os.lstat(entry.get('name')) + if remove: + if os.path.isdir(entry.get('name')): + rm = shutil.rmtree + else: + rm = os.unlink + try: + rm(entry.get('name')) + return False + except OSError: + err = sys.exc_info()[1] + self.logger.warning('POSIX: Failed to unlink %s: %s' % + (entry.get('name'), err)) + return ondisk # probably still exists + else: + return ondisk + except OSError: + return False + + def _set_perms(self, entry, path=None): + if path is None: + path = entry.get("name") + + rv = True + if entry.get("owner") and entry.get("group"): + try: + self.logger.debug("POSIX: Setting ownership of %s to %s:%s" % + (path, + self._norm_entry_uid(entry), + self._norm_entry_gid(entry))) + os.chown(path, self._norm_entry_uid(entry), + self._norm_entry_gid(entry)) + except KeyError: + self.logger.error('POSIX: Failed to change ownership of %s' % + path) + rv = False + os.chown(path, 0, 0) + except OSError: + self.logger.error('POSIX: Failed to change ownership of %s' % + path) + rv = False + + if entry.get("perms"): + configPerms = int(entry.get('perms'), 8) + if entry.get('dev_type'): + configPerms |= device_map[entry.get('dev_type')] + try: + self.logger.debug("POSIX: Setting permissions on %s to %s" % + (path, oct(configPerms))) + os.chmod(path, configPerms) + except (OSError, KeyError): + self.logger.error('POSIX: Failed to change permissions on %s' % + path) + rv = False + + if entry.get('mtime'): + try: + os.utime(entry.get('name'), (int(entry.get('mtime')), + int(entry.get('mtime')))) + except OSError: + self.logger.error("POSIX: Failed to set mtime of %s" % path) + rv = False + + rv &= self._set_secontext(entry, path=path) + rv &= self._set_acls(entry, path=path) + return rv + + + def _set_acls(self, entry, path=None): + """ set POSIX ACLs on the file on disk according to the config """ + if not has_acls: + if entry.findall("ACL"): + self.logger.debug("POSIX: ACLs listed for %s but no pylibacl " + "library installed" % entry.get('name')) + return True + + if path is None: + path = entry.get("name") + + try: + acl = posix1e.ACL(file=path) + except IOError: + err = sys.exc_info()[1] + if err.errno == 95: + # fs is mounted noacl + self.logger.error("POSIX: Cannot set ACLs on filesystem " + "mounted without ACL support: %s" % path) + else: + self.logger.error("POSIX: Error getting current ACLS on %s: %s" + % (path, err)) + return False + # clear ACLs out so we start fresh -- way easier than trying + # to add/remove/modify ACLs + for aclentry in acl: + if aclentry.tag_type in [posix1e.ACL_USER, posix1e.ACL_GROUP]: + acl.delete_entry(aclentry) + if os.path.isdir(path): + defacl = posix1e.ACL(filedef=path) + if not defacl.valid(): + # when a default ACL is queried on a directory that + # has no default ACL entries at all, you get an empty + # ACL, which is not valid. in this circumstance, we + # just copy the access ACL to get a base valid ACL + # that we can add things to. + defacl = posix1e.ACL(acl=acl) + else: + for aclentry in defacl: + if aclentry.tag_type in [posix1e.ACL_USER, + posix1e.ACL_GROUP]: + defacl.delete_entry(aclentry) + else: + defacl = None + + for aclkey, perms in self._list_entry_acls(entry).items(): + atype, scope, qualifier = aclkey + if atype == "default": + if defacl is None: + self.logger.warning("POSIX: Cannot set default ACLs on " + "non-directory %s" % path) + continue + entry = posix1e.Entry(defacl) + else: + entry = posix1e.Entry(acl) + for perm in acl_map.values(): + if perm & perms: + entry.permset.add(perm) + entry.tag_type = scope + try: + if scope == posix1e.ACL_USER: + scopename = "user" + entry.qualifier = self._norm_uid(qualifier) + elif scope == posix1e.ACL_GROUP: + scopename = "group" + entry.qualifier = self._norm_gid(qualifier) + except (OSError, KeyError): + err = sys.exc_info()[1] + self.logger.error("POSIX: Could not resolve %s %s: %s" % + (scopename, qualifier, err)) + continue + acl.calc_mask() + + def _apply_acl(acl, path, atype=posix1e.ACL_TYPE_ACCESS): + if atype == posix1e.ACL_TYPE_ACCESS: + atype_str = "access" + else: + atype_str = "default" + if acl.valid(): + self.logger.debug("POSIX: Applying %s ACL to %s:" % (atype_str, + path)) + for line in str(acl).splitlines(): + self.logger.debug(" " + line) + try: + acl.applyto(path, atype) + return True + except: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to set ACLs on %s: %s" % + (path, err)) + return False + else: + self.logger.warning("POSIX: %s ACL created for %s was invalid:" + % (atype_str.title(), path)) + for line in str(acl).splitlines(): + self.logger.warning(" " + line) + return False + + rv = _apply_acl(acl, path) + if defacl: + defacl.calc_mask() + rv &= _apply_acl(defacl, path, posix1e.ACL_TYPE_DEFAULT) + return rv + + def _set_secontext(self, entry, path=None): + """ set the SELinux context of the file on disk according to the + config""" + if not has_selinux: + return True + + if path is None: + path = entry.get("name") + context = entry.get("secontext") + if context is None: + # no context listed + return True + + if context == '__default__': + try: + selinux.restorecon(path) + rv = True + except: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to restore SELinux context " + "for %s: %s" % (path, err)) + rv = False + else: + try: + rv = selinux.lsetfilecon(path, context) == 0 + except: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to restore SELinux context " + "for %s: %s" % (path, err)) + rv = False + return rv + + def _norm_gid(self, gid): + """ This takes a group name or gid and returns the + corresponding gid. """ + try: + return int(gid) + except ValueError: + return int(grp.getgrnam(gid)[2]) + + def _norm_entry_gid(self, entry): + try: + return self._norm_gid(entry.get('group')) + except (OSError, KeyError): + err = sys.exc_info()[1] + self.logger.error('POSIX: GID normalization failed for %s on %s: %s' + % (entry.get('group'), entry.get('name'), err)) + return 0 + + def _norm_uid(self, uid): + """ This takes a username or uid and returns the + corresponding uid. """ + try: + return int(uid) + except ValueError: + return int(pwd.getpwnam(uid)[2]) + + def _norm_entry_uid(self, entry): + try: + return self._norm_uid(entry.get("owner")) + except (OSError, KeyError): + err = sys.exc_info()[1] + self.logger.error('POSIX: UID normalization failed for %s on %s: %s' + % (entry.get('owner'), entry.get('name'), err)) + return 0 + + def _norm_acl_perms(self, perms): + """ takes a representation of an ACL permset and returns a digit + representing the permissions entailed by it. representations can + either be a single octal digit, a string of up to three 'r', + 'w', 'x', or '-' characters, or a posix1e.Permset object""" + if hasattr(perms, 'test'): + # Permset object + return sum([p for p in acl_map.values() + if perms.test(p)]) + + try: + # single octal digit + rv = int(perms) + if rv > 0 and rv < 8: + return rv + else: + self.logger.error("POSIX: Permissions digit out of range in " + "ACL: %s" % perms) + return 0 + except ValueError: + # couldn't be converted to an int; process as a string + if len(perms) > 3: + self.logger.error("POSIX: Permissions string too long in ACL: " + "%s" % perms) + return 0 + rv = 0 + for char in perms: + if char == '-': + continue + elif char not in acl_map: + self.logger.warning("POSIX: Unknown permissions character " + "in ACL: %s" % char) + elif rv & acl_map[char]: + self.logger.warning("POSIX: Duplicate permissions " + "character in ACL: %s" % perms) + else: + rv |= acl_map[char] + return rv + + def _acl2string(self, aclkey, perms): + atype, scope, qualifier = aclkey + acl_str = [] + if atype == 'default': + acl_str.append(atype) + if scope == posix1e.ACL_USER: + acl_str.append("user") + elif scope == posix1e.ACL_GROUP: + acl_str.append("group") + acl_str.append(qualifier) + acl_str.append(self._acl_perm2string(perms)) + return ":".join(acl_str) + + def _acl_perm2string(self, perm): + rv = [] + for char in 'rwx': + if acl_map[char] & perm: + rv.append(char) + else: + rv.append('-') + return ''.join(rv) + + def _gather_data(self, path): + try: + ondisk = os.stat(path) + except OSError: + self.logger.debug("POSIX: %s does not exist" % path) + return (False, None, None, None, None, None) + + try: + owner = str(ondisk[stat.ST_UID]) + except OSError: + err = sys.exc_info()[1] + self.logger.debug("POSIX: Could not get current owner of %s: %s" % + (path, err)) + owner = None + except KeyError: + self.logger.error('POSIX: User resolution failed for %s' % path) + owner = None + + try: + group = str(ondisk[stat.ST_GID]) + except (OSError, KeyError): + err = sys.exc_info()[1] + self.logger.debug("POSIX: Could not get current group of %s: %s" % + (path, err)) + group = None + except KeyError: + self.logger.error('POSIX: Group resolution failed for %s' % path) + group = None + + try: + perms = oct(ondisk[stat.ST_MODE])[-4:] + except (OSError, KeyError, TypeError): + err = sys.exc_info()[1] + self.logger.debug("POSIX: Could not get current permissions of %s: " + "%s" % (path, err)) + perms = None + + if has_selinux: + try: + secontext = selinux.getfilecon(path)[1].split(":")[2] + except (OSError, KeyError): + err = sys.exc_info()[1] + self.logger.debug("POSIX: Could not get current SELinux " + "context of %s: %s" % (path, err)) + secontext = None + else: + secontext = None + + if has_acls: + acls = self._list_file_acls(path) + else: + acls = None + return (ondisk, owner, group, perms, secontext, acls) + + def _verify_metadata(self, entry, path=None): + """ generic method to verify perms, owner, group, secontext, acls, + and mtime """ + # allow setting an alternate path for recursive permissions checking + if path is None: + path = entry.get('name') + attrib = dict() + ondisk, attrib['current_owner'], attrib['current_group'], \ + attrib['current_perms'], attrib['current_secontext'], acls = \ + self._gather_data(path) + + if not ondisk: + entry.set('current_exists', 'false') + return False + + # we conditionally verify every bit of metadata only if it's + # specified on the entry. consequently, canVerify() and + # fully_specified() are preconditions of _verify_metadata(), + # since they will ensure that everything that needs to be + # specified actually is. this lets us gracefully handle + # symlink and hardlink entries, which have SELinux contexts + # but not other permissions, optional secontext and mtime + # attrs, and so on. + configOwner, configGroup, configPerms, mtime = None, None, None, -1 + if entry.get('mtime', '-1') != '-1': + mtime = str(ondisk[stat.ST_MTIME]) + if entry.get("owner"): + configOwner = str(self._norm_entry_uid(entry)) + if entry.get("group"): + configGroup = str(self._norm_entry_gid(entry)) + if entry.get("perms"): + while len(entry.get('perms', '')) < 4: + entry.set('perms', '0' + entry.get('perms', '')) + configPerms = int(entry.get('perms'), 8) + + errors = [] + if configOwner and attrib['current_owner'] != configOwner: + errors.append("Owner for path %s is incorrect. " + "Current owner is %s but should be %s" % + (path, attrib['current_owner'], entry.get('owner'))) + + if configGroup and attrib['current_group'] != configGroup: + errors.append("Group for path %s is incorrect. " + "Current group is %s but should be %s" % + (path, attrib['current_group'], entry.get('group'))) + + if (configPerms and + oct(int(attrib['current_perms'], 8)) != oct(configPerms)): + errors.append("Permissions for path %s are incorrect. " + "Current permissions are %s but should be %s" % + (path, attrib['current_perms'], entry.get('perms'))) + + if entry.get('mtime'): + attrib['current_mtime'] = mtime + if mtime != entry.get('mtime', '-1'): + errors.append("mtime for path %s is incorrect. " + "Current mtime is %s but should be %s" % + (path, mtime, entry.get('mtime'))) + + if has_selinux and entry.get("secontext"): + if entry.get("secontext") == "__default__": + configContext = selinux.matchpathcon(path, 0)[1].split(":")[2] + else: + configContext = entry.get("secontext") + if attrib['current_secontext'] != configContext: + errors.append("SELinux context for path %s is incorrect. " + "Current context is %s but should be %s" % + (path, attrib['current_secontext'], + configContext)) + + if errors: + for error in errors: + self.logger.debug("POSIX: " + error) + entry.set('qtext', "\n".join([entry.get('qtext', '')] + errors)) + if path == entry.get("name"): + for attr, val in attrib.items(): + entry.set(attr, val) + + aclVerifies = self._verify_acls(entry, path=path) + return aclVerifies and len(errors) == 0 + + def _list_entry_acls(self, entry): + wanted = dict() + for acl in entry.findall("ACL"): + if acl.get("scope") == "user": + scope = posix1e.ACL_USER + elif acl.get("scope") == "group": + scope = posix1e.ACL_GROUP + else: + self.logger.error("POSIX: Unknown ACL scope %s" % + acl.get("scope")) + continue + wanted[(acl.get("type"), scope, acl.get(acl.get("scope")))] = \ + self._norm_acl_perms(acl.get('perms')) + return wanted + + def _list_file_acls(self, path): + def _process_acl(acl, atype): + try: + if acl.tag_type == posix1e.ACL_USER: + qual = pwd.getpwuid(acl.qualifier)[0] + elif acl.tag_type == posix1e.ACL_GROUP: + qual = grp.getgrgid(acl.qualifier)[0] + else: + return + except (OSError, KeyError): + err = sys.exc_info()[1] + self.logger.error("POSIX: Lookup of %s %s failed: %s" % + (scope, acl.qualifier, err)) + qual = acl.qualifier + existing[(atype, acl.tag_type, qual)] = \ + self._norm_acl_perms(acl.permset) + + existing = dict() + try: + for acl in posix1e.ACL(file=path): + _process_acl(acl, "access") + except IOError: + err = sys.exc_info()[1] + if err.errno == 95: + # fs is mounted noacl + self.logger.debug("POSIX: Filesystem mounted without ACL " + "support: %s" % path) + else: + self.logger.error("POSIX: Error getting current ACLS on %s: %s" + % (path, err)) + return existing + + if os.path.isdir(path): + for acl in posix1e.ACL(filedef=path): + _process_acl(acl, "default") + return existing + + def _verify_acls(self, entry, path=None): + if not has_acls: + if entry.findall("ACL"): + self.logger.debug("POSIX: ACLs listed for %s but no pylibacl " + "library installed" % entry.get('name')) + return True + + if path is None: + path = entry.get("name") + + # create lists of normalized representations of the ACLs we want + # and the ACLs we have. this will make them easier to compare + # than trying to mine that data out of the ACL objects and XML + # objects and compare it at the same time. + wanted = self._list_entry_acls(entry) + existing = self._list_file_acls(path) + + missing = [] + extra = [] + wrong = [] + for aclkey, perms in wanted.items(): + if aclkey not in existing: + missing.append(self._acl2string(aclkey, perms)) + elif existing[aclkey] != perms: + wrong.append((self._acl2string(aclkey, perms), + self._acl2string(aclkey, existing[aclkey]))) + if path == entry.get("name"): + atype, scope, qual = aclkey + aclentry = Bcfg2.Client.XML.Element("ACL", type=atype, + perms=str(perms)) + if scope == posix1e.ACL_USER: + aclentry.set("scope", "user") + elif scope == posix1e.ACL_GROUP: + aclentry.set("scope", "group") + else: + self.logger.debug("POSIX: Unknown ACL scope %s on %s" % + (scope, path)) + continue + aclentry.set(aclentry.get("scope"), qual) + entry.append(aclentry) + + for aclkey, perms in existing.items(): + if aclkey not in wanted: + extra.append(self._acl2string(aclkey, perms)) + + msg = [] + if missing: + msg.append("%s ACLs are missing: %s" % (len(missing), + ", ".join(missing))) + if wrong: + msg.append("%s ACLs are wrong: %s" % + (len(wrong), + "; ".join(["%s should be %s" % (e, w) + for w, e in wrong]))) + if extra: + msg.append("%s extra ACLs: %s" % (len(extra), ", ".join(extra))) + + if msg: + msg.insert(0, "POSIX: ACLs for %s are incorrect." % path) + self.logger.debug(msg[0]) + for line in msg[1:]: + self.logger.debug(" " + line) + entry.set('qtext', "\n".join([entry.get("qtext", '')] + msg)) + return False + return True + + def _makedirs(self, entry, path=None): + """ os.makedirs helpfully creates all parent directories for + us, but it sets permissions according to umask, which is + probably wrong. we need to find out which directories were + created and set permissions on those + (http://trac.mcs.anl.gov/projects/bcfg2/ticket/1125) """ + created = [] + if path is None: + path = entry.get("name") + cur = path + while cur != '/': + if not os.path.exists(cur): + created.append(cur) + cur = os.path.dirname(cur) + rv = True + try: + os.makedirs(path) + except OSError: + err = sys.exc_info()[1] + self.logger.error('POSIX: Failed to create directory %s: %s' % + (path, err)) + rv = False + for cpath in created: + rv &= self._set_perms(entry, path=cpath) + return rv diff --git a/src/lib/Bcfg2/Client/Tools/__init__.py b/src/lib/Bcfg2/Client/Tools/__init__.py index e4a0ec220..1f191fce3 100644 --- a/src/lib/Bcfg2/Client/Tools/__init__.py +++ b/src/lib/Bcfg2/Client/Tools/__init__.py @@ -1,17 +1,27 @@ """This contains all Bcfg2 Tool modules""" import os -import stat import sys -from subprocess import Popen, PIPE +import stat import time +import pkgutil +from subprocess import Popen, PIPE import Bcfg2.Client.XML from Bcfg2.Bcfg2Py3k import input -__all__ = [tool.split('.')[0] \ - for tool in os.listdir(os.path.dirname(__file__)) \ - if tool.endswith(".py") and tool != "__init__.py"] - +if hasattr(pkgutil, 'walk_packages'): + submodules = pkgutil.walk_packages(path=__path__) +else: + # python 2.4 + import glob + submodules = [] + for path in __path__: + for submodule in glob.glob(os.path.join(path, "*.py")): + mod = os.path.splitext(os.path.basename(submodule))[0] + if mod not in ['__init__']: + submodules.append((None, mod, True)) + +__all__ = [m[1] for m in submodules] drivers = [item for item in __all__ if item not in ['rpmtools']] default = [item for item in drivers if item not in ['RPM', 'Yum']] @@ -37,7 +47,7 @@ class executor: return (p.returncode, output.splitlines()) -class Tool: +class Tool(object): """ All tools subclass this. It defines all interfaces that need to be defined. """ @@ -48,10 +58,6 @@ class Tool: __important__ = [] def __init__(self, logger, setup, config): - self.__important__ = [entry.get('name') \ - for struct in config for entry in struct \ - if entry.tag == 'Path' and \ - entry.get('important') in ['true', 'True']] self.setup = setup self.logger = logger if not hasattr(self, '__ireq__'): @@ -60,8 +66,15 @@ class Tool: self.cmd = executor(logger) self.modified = [] self.extra = [] - self.handled = [entry for struct in self.config for entry in struct \ - if self.handlesEntry(entry)] + self.__important__ = [] + self.handled = [] + for struct in config: + for entry in struct: + if (entry.tag == 'Path' and + entry.get('important', 'false').lower() == 'true'): + self.__important__.append(entry.get('name')) + if self.handlesEntry(entry): + self.handled.append(entry) for filename in self.__execs__: try: mode = stat.S_IMODE(os.stat(filename)[stat.ST_MODE]) @@ -131,7 +144,7 @@ class Tool: '''Build a list of potentially modified POSIX paths for this entry''' return [entry.get('name') for struct in self.config.getchildren() \ for entry in struct.getchildren() \ - if entry.tag in ['Ignore', 'Path']] + if entry.tag == 'Path'] def gatherCurrentData(self, entry): """Default implementation of the information gathering routines.""" @@ -163,10 +176,10 @@ class Tool: missing = self.missing_attrs(entry) if missing: - self.logger.error("Incomplete information for entry %s:%s; cannot verify" \ - % (entry.tag, entry.get('name'))) - self.logger.error("\t... due to absence of %s attribute(s)" % \ - (":".join(missing))) + self.logger.error("Cannot verify entry %s:%s due to missing " + "required attribute(s): %s" % + (entry.tag, entry.get('name'), + ", ".join(missing))) try: self.gatherCurrentData(entry) except: diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py index e768284b9..320a219d3 100644 --- a/src/lib/Bcfg2/Options.py +++ b/src/lib/Bcfg2/Options.py @@ -933,6 +933,8 @@ CLIENT_COMMON_OPTIONS = \ drivers=CLIENT_DRIVERS, dryrun=CLIENT_DRYRUN, paranoid=CLIENT_PARANOID, + ppath=PARANOID_PATH, + max_copies=PARANOID_MAX_COPIES, bundle=CLIENT_BUNDLE, skipbundle=CLIENT_SKIPBUNDLE, bundle_quick=CLIENT_BUNDLEQUICK, diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py new file mode 100644 index 000000000..7d64c5a2e --- /dev/null +++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py @@ -0,0 +1,139 @@ +import os +import copy +import unittest +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.Device import * +from Test__init import get_posix_object + +def call(*args, **kwargs): + """ the Mock call object is a fairly recent addition, but it's + very very useful, so we create our own function to create Mock + calls """ + return (args, kwargs) + +def get_device_object(posix=None): + if posix is None: + posix = get_posix_object() + return POSIXDevice(posix.logger, posix.setup, posix.config) + +class TestPOSIXDevice(unittest.TestCase): + def test_fully_specified(self): + ptool = get_device_object() + orig_entry = lxml.etree.Element("Path", name="/test", type="device", + dev_type="fifo") + self.assertTrue(ptool.fully_specified(orig_entry)) + for dtype in ["block", "char"]: + for attr in ["major", "minor"]: + entry = copy.deepcopy(orig_entry) + entry.set("dev_type", dtype) + entry.set(attr, "0") + self.assertFalse(ptool.fully_specified(entry)) + entry = copy.deepcopy(orig_entry) + entry.set("dev_type", dtype) + entry.set("major", "0") + entry.set("minor", "0") + self.assertTrue(ptool.fully_specified(entry)) + + @patch("os.major") + @patch("os.minor") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._exists") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify") + def test_verify(self, mock_verify, mock_exists, mock_minor, mock_major): + entry = lxml.etree.Element("Path", name="/test", type="device", + perms='0644', owner='root', group='root', + dev_type="block", major="0", minor="10") + ptool = get_device_object() + + def reset(): + mock_exists.reset_mock() + mock_verify.reset_mock() + mock_minor.reset_mock() + mock_major.reset_mock() + + mock_exists.return_value = False + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + + reset() + mock_exists.return_value = MagicMock() + mock_major.return_value = 0 + mock_minor.return_value = 10 + mock_verify.return_value = True + self.assertTrue(ptool.verify(entry, [])) + mock_verify.assert_called_with(ptool, entry, []) + mock_exists.assert_called_with(entry) + mock_major.assert_called_with(mock_exists.return_value.st_rdev) + mock_minor.assert_called_with(mock_exists.return_value.st_rdev) + + reset() + mock_exists.return_value = MagicMock() + mock_major.return_value = 0 + mock_minor.return_value = 10 + mock_verify.return_value = False + self.assertFalse(ptool.verify(entry, [])) + mock_verify.assert_called_with(ptool, entry, []) + mock_exists.assert_called_with(entry) + mock_major.assert_called_with(mock_exists.return_value.st_rdev) + mock_minor.assert_called_with(mock_exists.return_value.st_rdev) + + reset() + mock_verify.return_value = True + entry = lxml.etree.Element("Path", name="/test", type="device", + perms='0644', owner='root', group='root', + dev_type="fifo") + self.assertTrue(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + self.assertFalse(mock_major.called) + self.assertFalse(mock_minor.called) + + @patch("os.makedev") + @patch("os.mknod") + @patch("Bcfg2.Client.Tools.POSIX.Device.POSIXDevice._exists") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install") + def test_install(self, mock_install, mock_exists, mock_mknod, mock_makedev): + entry = lxml.etree.Element("Path", name="/test", type="device", + perms='0644', owner='root', group='root', + dev_type="block", major="0", minor="10") + ptool = get_device_object() + + mock_exists.return_value = False + mock_makedev.return_value = Mock() + mock_install.return_value = True + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with(entry, remove=True) + mock_makedev.assert_called_with(0, 10) + mock_mknod.assert_called_with(entry.get("name"), + device_map[entry.get("dev_type")] | 0644, + mock_makedev.return_value) + mock_install.assert_called_with(ptool, entry) + + mock_makedev.reset_mock() + mock_mknod.reset_mock() + mock_exists.reset_mock() + mock_install.reset_mock() + mock_makedev.side_effect = OSError + self.assertFalse(ptool.install(entry)) + + mock_makedev.reset_mock() + mock_mknod.reset_mock() + mock_exists.reset_mock() + mock_install.reset_mock() + mock_mknod.side_effect = OSError + self.assertFalse(ptool.install(entry)) + + mock_makedev.reset_mock() + mock_mknod.reset_mock() + mock_exists.reset_mock() + mock_install.reset_mock() + mock_mknod.side_effect = None + entry = lxml.etree.Element("Path", name="/test", type="device", + perms='0644', owner='root', group='root', + dev_type="fifo") + + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with(entry, remove=True) + mock_mknod.assert_called_with(entry.get("name"), + device_map[entry.get("dev_type")] | 0644) + mock_install.assert_called_with(ptool, entry) diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py new file mode 100644 index 000000000..021ed8113 --- /dev/null +++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py @@ -0,0 +1,154 @@ +import os +import stat +import copy +import unittest +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.Directory import * +from Test__init import get_posix_object + +def call(*args, **kwargs): + """ the Mock call object is a fairly recent addition, but it's + very very useful, so we create our own function to create Mock + calls """ + return (args, kwargs) + +def get_directory_object(posix=None): + if posix is None: + posix = get_posix_object() + return POSIXDirectory(posix.logger, posix.setup, posix.config) + +class TestPOSIXDirectory(unittest.TestCase): + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify") + @patch("Bcfg2.Client.Tools.POSIX.Directory.POSIXDirectory._exists") + def test_verify(self, mock_exists, mock_verify): + entry = lxml.etree.Element("Path", name="/test", type="directory", + perms='0644', owner='root', group='root') + ptool = get_directory_object() + + mock_exists.return_value = False + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + + mock_exists.reset_mock() + exists_rv = MagicMock() + exists_rv.__getitem__.return_value = stat.S_IFREG | 0644 + mock_exists.return_value = exists_rv + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + + mock_exists.reset_mock() + mock_verify.return_value = False + exists_rv.__getitem__.return_value = stat.S_IFDIR | 0644 + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + + mock_exists.reset_mock() + mock_verify.reset_mock() + mock_verify.return_value = True + self.assertTrue(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + + with patch("os.listdir") as mock_listdir: + mock_exists.reset_mock() + mock_verify.reset_mock() + entry.set("prune", "true") + orig_entry = copy.deepcopy(entry) + + entries = ["foo", "bar", "bar/baz"] + mock_listdir.return_value = entries + modlist = [os.path.join(entry.get("name"), entries[0])] + self.assertFalse(ptool.verify(entry, modlist)) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, modlist) + mock_listdir.assert_called_with(entry.get("name")) + expected = [os.path.join(entry.get("name"), e) + for e in entries + if os.path.join(entry.get("name"), e) not in modlist] + actual = [e.get("path") for e in entry.findall("Prune")] + self.assertItemsEqual(expected, actual) + + mock_verify.reset_mock() + mock_exists.reset_mock() + mock_listdir.reset_mock() + entry = copy.deepcopy(orig_entry) + modlist = [os.path.join(entry.get("name"), e) + for e in entries] + self.assertTrue(ptool.verify(entry, modlist)) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, modlist) + mock_listdir.assert_called_with(entry.get("name")) + self.assertEqual(len(entry.findall("Prune")), 0) + + @patch("os.unlink") + @patch("shutil.rmtree") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install") + @patch("Bcfg2.Client.Tools.POSIX.Directory.POSIXDirectory._exists") + @patch("Bcfg2.Client.Tools.POSIX.Directory.POSIXDirectory._makedirs") + def test_install(self, mock_makedirs, mock_exists, mock_install, + mock_rmtree, mock_unlink): + entry = lxml.etree.Element("Path", name="/test/foo/bar", + type="directory", perms='0644', + owner='root', group='root') + ptool = get_directory_object() + + def reset(): + mock_exists.reset_mock() + mock_install.reset_mock() + mock_unlink.reset_mock() + mock_rmtree.reset_mock() + mock_rmtree.mock_makedirs() + + mock_makedirs.return_value = True + mock_exists.return_value = False + mock_install.return_value = True + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with(entry) + mock_install.assert_called_with(ptool, entry) + mock_makedirs.assert_called_with(entry) + + reset() + exists_rv = MagicMock() + exists_rv.__getitem__.return_value = stat.S_IFREG | 0644 + mock_exists.return_value = exists_rv + self.assertTrue(ptool.install(entry)) + mock_unlink.assert_called_with(entry.get("name")) + mock_exists.assert_called_with(entry) + mock_makedirs.assert_called_with(entry) + mock_install.assert_called_with(ptool, entry) + + reset() + exists_rv.__getitem__.return_value = stat.S_IFDIR | 0644 + mock_install.return_value = True + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with(entry) + mock_install.assert_called_with(ptool, entry) + + reset() + mock_install.return_value = False + self.assertFalse(ptool.install(entry)) + mock_install.assert_called_with(ptool, entry) + + entry.set("prune", "true") + prune = ["/test/foo/bar/prune1", "/test/foo/bar/prune2"] + for path in prune: + lxml.etree.SubElement(entry, "Prune", path=path) + + reset() + mock_install.return_value = True + with patch("os.path.isdir") as mock_isdir: + def isdir_rv(path): + if path.endswith("prune2"): + return True + else: + return False + mock_isdir.side_effect = isdir_rv + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with(entry) + mock_install.assert_called_with(ptool, entry) + self.assertItemsEqual(mock_isdir.call_args_list, + [call(p) for p in prune]) + mock_unlink.assert_called_with("/test/foo/bar/prune1") + mock_rmtree.assert_called_with("/test/foo/bar/prune2") diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py new file mode 100644 index 000000000..a2cd52dd5 --- /dev/null +++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py @@ -0,0 +1,318 @@ +import os +import copy +import binascii +import unittest +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.File import * +from Test__init import get_posix_object + +def call(*args, **kwargs): + """ the Mock call object is a fairly recent addition, but it's + very very useful, so we create our own function to create Mock + calls """ + return (args, kwargs) + +def get_file_object(posix=None): + if posix is None: + posix = get_posix_object() + return POSIXFile(posix.logger, posix.setup, posix.config) + +class TestPOSIXFile(unittest.TestCase): + def test_fully_specified(self): + entry = lxml.etree.Element("Path", name="/test", type="file") + ptool = get_file_object() + self.assertFalse(ptool.fully_specified(entry)) + + entry.set("empty", "true") + self.assertTrue(ptool.fully_specified(entry)) + + entry.set("empty", "false") + entry.text = "text" + self.assertTrue(ptool.fully_specified(entry)) + + def test_is_string(self): + ptool = get_file_object() + for char in range(8) + range(14, 32): + self.assertFalse(ptool._is_string("foo" + chr(char) + "bar", + 'utf_8')) + for char in range(9, 14) + range(33, 128): + self.assertTrue(ptool._is_string("foo" + chr(char) + "bar", + 'utf_8')) + self.assertFalse(ptool._is_string("foo" + chr(128) + "bar", + 'ascii')) + ustr = '\xef\xa3\x91 + \xef\xa3\x92' + self.assertTrue(ptool._is_string(ustr, 'utf_8')) + self.assertFalse(ptool._is_string(ustr, 'ascii')) + + def test_get_data(self): + orig_entry = lxml.etree.Element("Path", name="/test", type="file") + setup = dict(encoding="ascii", ppath='/', max_copies=5) + ptool = get_file_object(posix=get_posix_object(setup=setup)) + + entry = copy.deepcopy(orig_entry) + entry.text = binascii.b2a_base64("test") + entry.set("encoding", "base64") + self.assertEqual(ptool._get_data(entry), ("test", True)) + + entry = copy.deepcopy(orig_entry) + entry.set("empty", "true") + self.assertEqual(ptool._get_data(entry), ("", False)) + + entry = copy.deepcopy(orig_entry) + entry.text = "test" + self.assertEqual(ptool._get_data(entry), ("test", False)) + + ustr = u'\uf8d1 + \uf8d2' + entry = copy.deepcopy(orig_entry) + entry.text = ustr + self.assertEqual(ptool._get_data(entry), (ustr, False)) + + setup['encoding'] = "utf_8" + ptool = get_file_object(posix=get_posix_object(setup=setup)) + entry = copy.deepcopy(orig_entry) + entry.text = ustr + self.assertEqual(ptool._get_data(entry), + ('\xef\xa3\x91 + \xef\xa3\x92', False)) + + @patch("__builtin__.open") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._exists") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_data") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_diffs") + def test_verify(self, mock_get_diffs, mock_get_data, mock_exists, + mock_verify, mock_open): + entry = lxml.etree.Element("Path", name="/test", type="file") + setup = dict(interactive=False, ppath='/', max_copies=5) + ptool = get_file_object(posix=get_posix_object(setup=setup)) + + def reset(): + mock_get_diffs.reset_mock() + mock_get_data.reset_mock() + mock_exists.reset_mock() + mock_verify.reset_mock() + mock_open.reset_mock() + + mock_get_data.return_value = ("test", False) + mock_exists.return_value = False + mock_verify.return_value = True + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_get_diffs.assert_called_with(entry, interactive=False, + sensitive=False, + is_binary=False, + content="") + + reset() + exists_rv = MagicMock() + exists_rv.__getitem__.return_value = 5 + mock_exists.return_value = exists_rv + mock_get_data.return_value = ("test", True) + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_get_diffs.assert_called_with(entry, interactive=False, + sensitive=False, + is_binary=True, + content=None) + + reset() + mock_get_data.return_value = ("test", False) + exists_rv.__getitem__.return_value = 4 + entry.set("sensitive", "true") + open_rv = Mock() + open_rv.read.return_value = "tart" + mock_open.return_value = open_rv + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_open.assert_called_with(entry.get("name")) + open_rv.assert_any_call() + mock_get_diffs.assert_called_with(entry, interactive=False, + sensitive=True, + is_binary=False, + content="tart") + + reset() + open_rv.read.return_value = "test" + mock_open.return_value = open_rv + self.assertTrue(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_verify.assert_called_with(ptool, entry, []) + mock_open.assert_called_with(entry.get("name")) + open_rv.assert_any_call() + self.assertFalse(mock_get_diffs.called) + + reset() + mock_open.side_effect = IOError + self.assertFalse(ptool.verify(entry, [])) + mock_exists.assert_called_with(entry) + mock_open.assert_called_with(entry.get("name")) + + @patch("os.fdopen") + @patch("tempfile.mkstemp") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_data") + def test_write_tmpfile(self, mock_get_data, mock_mkstemp, mock_fdopen): + entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', group='root') + ptool = get_file_object() + newfile = "/foo/bar" + + def reset(): + mock_get_data.reset_mock() + mock_mkstemp.reset_mock() + mock_fdopen.reset_mock() + + mock_get_data.return_value = ("test", False) + mock_mkstemp.return_value = (5, newfile) + self.assertEqual(ptool._write_tmpfile(entry), newfile) + mock_get_data.assert_called_with(entry) + mock_mkstemp.assert_called_with(prefix='test', dir='/') + mock_fdopen.assert_called_with(5, 'w') + mock_fdopen.return_value.write.assert_called_with("test") + + reset() + mock_mkstemp.side_effect = OSError + self.assertFalse(ptool._write_tmpfile(entry)) + mock_mkstemp.assert_called_with(prefix='test', dir='/') + + reset() + mock_mkstemp.side_effect = None + mock_fdopen.side_effect = OSError + self.assertFalse(ptool._write_tmpfile(entry)) + mock_mkstemp.assert_called_with(prefix='test', dir='/') + mock_get_data.assert_called_with(entry) + mock_fdopen.assert_called_with(5, 'w') + + @patch("os.rename") + @patch("os.unlink") + def test_rename_tmpfile(self, mock_unlink, mock_rename): + entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', group='root') + ptool = get_file_object() + newfile = "/foo/bar" + + self.assertTrue(ptool._rename_tmpfile(newfile, entry)) + mock_rename.assert_called_with(newfile, entry.get("name")) + + mock_rename.reset_mock() + mock_unlink.reset_mock() + mock_rename.side_effect = OSError + self.assertFalse(ptool._rename_tmpfile(newfile, entry)) + mock_rename.assert_called_with(newfile, entry.get("name")) + mock_unlink.assert_called_with(newfile) + + # even if the unlink fails, return false gracefully + mock_rename.reset_mock() + mock_unlink.reset_mock() + mock_unlink.side_effect = OSError + self.assertFalse(ptool._rename_tmpfile(newfile, entry)) + mock_rename.assert_called_with(newfile, entry.get("name")) + mock_unlink.assert_called_with(newfile) + + @patch("os.path.exists") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._makedirs") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._set_perms") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._write_tmpfile") + @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._rename_tmpfile") + def test_install(self, mock_rename, mock_write, mock_set_perms, + mock_makedirs, mock_install, mock_exists): + entry = lxml.etree.Element("Path", name="/test", type="file", + perms='0644', owner='root', group='root') + ptool = get_file_object() + + def reset(): + mock_rename.reset_mock() + mock_write.reset_mock() + mock_set_perms.reset_mock() + mock_makedirs.reset_mock() + mock_install.reset_mock() + mock_exists.reset_mock() + + mock_exists.return_value = False + mock_makedirs.return_value = False + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + + reset() + mock_makedirs.return_value = True + mock_write.return_value = False + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + + reset() + newfile = '/test.X987yS' + mock_write.return_value = newfile + mock_set_perms.return_value = False + mock_rename.return_value = False + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + + reset() + mock_rename.return_value = True + mock_install.return_value = False + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(ptool, entry) + + reset() + mock_install.return_value = True + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(ptool, entry) + + reset() + mock_set_perms.return_value = True + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with("/") + mock_makedirs.assert_called_with(entry, path="/") + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(ptool, entry) + + reset() + mock_exists.return_value = True + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with("/") + self.assertFalse(mock_makedirs.called) + mock_write.assert_called_with(entry) + mock_set_perms.assert_called_with(entry, path=newfile) + mock_rename.assert_called_with(newfile, entry) + mock_install.assert_called_with(ptool, entry) + + def test_diff(self): + ptool = get_file_object() + content1 = "line1\nline2" + content2 = "line3" + rv = ["line1", "line2", "line3"] + func = Mock() + func.return_value = rv + self.assertItemsEqual(ptool._diff(content1, content2, func), rv) + func.assert_called_with(["line1", "line2"], ["line3"]) + + func.reset_mock() + def slow_diff(content1, content2): + for i in range(1, 10): + time.sleep(5) + yield "line%s" % i + func.side_effect = slow_diff + self.assertFalse(ptool._diff(content1, content2, func), rv) + func.assert_called_with(["line1", "line2"], ["line3"]) diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py new file mode 100644 index 000000000..e663973c7 --- /dev/null +++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py @@ -0,0 +1,80 @@ +import os +import copy +import unittest +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.Hardlink import * +from Test__init import get_posix_object + +def call(*args, **kwargs): + """ the Mock call object is a fairly recent addition, but it's + very very useful, so we create our own function to create Mock + calls """ + return (args, kwargs) + +def get_hardlink_object(posix=None): + if posix is None: + posix = get_posix_object() + return POSIXHardlink(posix.logger, posix.setup, posix.config) + +class TestPOSIXHardlink(unittest.TestCase): + @patch("os.path.samefile") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify") + def test_verify(self, mock_verify, mock_samefile): + entry = lxml.etree.Element("Path", name="/test", type="hardlink", + to="/dest") + ptool = get_hardlink_object() + + mock_samefile.return_value = True + mock_verify.return_value = False + self.assertFalse(ptool.verify(entry, [])) + mock_samefile.assert_called_with(entry.get("name"), + entry.get("to")) + mock_verify.assert_called_with(ptool, entry, []) + + mock_samefile.reset_mock() + mock_verify.reset_mock() + mock_verify.return_value = True + self.assertTrue(ptool.verify(entry, [])) + mock_samefile.assert_called_with(entry.get("name"), + entry.get("to")) + mock_verify.assert_called_with(ptool, entry, []) + + mock_samefile.reset_mock() + mock_verify.reset_mock() + mock_samefile.return_value = False + self.assertFalse(ptool.verify(entry, [])) + mock_samefile.assert_called_with(entry.get("name"), + entry.get("to")) + mock_verify.assert_called_with(ptool, entry, []) + + mock_samefile.reset_mock() + mock_verify.reset_mock() + mock_samefile.side_effect = OSError + self.assertFalse(ptool.verify(entry, [])) + mock_samefile.assert_called_with(entry.get("name"), + entry.get("to")) + + @patch("os.link") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install") + @patch("Bcfg2.Client.Tools.POSIX.Hardlink.POSIXHardlink._exists") + def test_install(self, mock_exists, mock_install, mock_link): + entry = lxml.etree.Element("Path", name="/test", type="hardlink", + to="/dest") + ptool = get_hardlink_object() + + mock_exists.return_value = False + mock_install.return_value = True + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with(entry, remove=True) + mock_link.assert_called_with(entry.get("to"), entry.get("name")) + mock_install.assert_called_with(ptool, entry) + + mock_link.reset_mock() + mock_exists.reset_mock() + mock_install.reset_mock() + mock_link.side_effect = OSError + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with(entry, remove=True) + mock_link.assert_called_with(entry.get("to"), entry.get("name")) + mock_install.assert_called_with(ptool, entry) diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py new file mode 100644 index 000000000..38f3b6ee3 --- /dev/null +++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py @@ -0,0 +1,88 @@ +import os +import copy +import unittest +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.Nonexistent import * +from Test__init import get_config, get_posix_object + +def call(*args, **kwargs): + """ the Mock call object is a fairly recent addition, but it's + very very useful, so we create our own function to create Mock + calls """ + return (args, kwargs) + +def get_nonexistent_object(posix=None): + if posix is None: + posix = get_posix_object() + return POSIXNonexistent(posix.logger, posix.setup, posix.config) + +class TestPOSIXNonexistent(unittest.TestCase): + @patch("os.path.lexists") + def test_verify(self, mock_lexists): + entry = lxml.etree.Element("Path", name="/test", type="nonexistent") + ptool = get_nonexistent_object() + + for val in [True, False]: + mock_lexists.reset_mock() + mock_lexists.return_value = val + self.assertEqual(ptool.verify(entry, []), not val) + mock_lexists.assert_called_with(entry.get("name")) + + @patch("os.rmdir") + @patch("os.remove") + @patch("shutil.rmtree") + def test_install(self, mock_rmtree, mock_remove, mock_rmdir): + entry = lxml.etree.Element("Path", name="/test", type="nonexistent") + ptool = get_nonexistent_object() + + with patch("os.path.isdir") as mock_isdir: + def reset(): + mock_isdir.reset_mock() + mock_remove.reset_mock() + mock_rmdir.reset_mock() + mock_rmtree.reset_mock() + + mock_isdir.return_value = False + self.assertTrue(ptool.install(entry)) + mock_remove.assert_called_with(entry.get("name")) + + reset() + mock_remove.side_effect = OSError + self.assertFalse(ptool.install(entry)) + mock_remove.assert_called_with(entry.get("name")) + + reset() + mock_isdir.return_value = True + self.assertTrue(ptool.install(entry)) + mock_rmdir.assert_called_with(entry.get("name")) + + reset() + mock_rmdir.side_effect = OSError + self.assertFalse(ptool.install(entry)) + mock_rmdir.assert_called_with(entry.get("name")) + + reset() + entry.set("recursive", "true") + self.assertTrue(ptool.install(entry)) + mock_rmtree.assert_called_with(entry.get("name")) + + reset() + mock_rmtree.side_effect = OSError + self.assertFalse(ptool.install(entry)) + mock_rmtree.assert_called_with(entry.get("name")) + + reset() + child_entry = lxml.etree.Element("Path", name="/test/foo", + type="nonexistent") + ptool = get_nonexistent_object(posix=get_posix_object(config=get_config([child_entry]))) + mock_rmtree.side_effect = None + self.assertTrue(ptool.install(entry)) + mock_rmtree.assert_called_with(entry.get("name")) + + reset() + child_entry = lxml.etree.Element("Path", name="/test/foo", + type="file") + ptool = get_nonexistent_object(posix=get_posix_object(config=get_config([child_entry]))) + mock_rmtree.side_effect = None + self.assertFalse(ptool.install(entry)) diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py new file mode 100644 index 000000000..94b74dd13 --- /dev/null +++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py @@ -0,0 +1,21 @@ +import os +import unittest +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.Permissions import * +from Test__init import get_posix_object + +def call(*args, **kwargs): + """ the Mock call object is a fairly recent addition, but it's + very very useful, so we create our own function to create Mock + calls """ + return (args, kwargs) + +def get_permissions_object(posix=None): + if posix is None: + posix = get_posix_object() + return POSIXPermissions(posix.logger, posix.setup, posix.config) + +class TestPOSIXPermissions(unittest.TestCase): + # nothing to test! + pass diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py new file mode 100644 index 000000000..a3ed9f68d --- /dev/null +++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py @@ -0,0 +1,76 @@ +import os +import copy +import unittest +import lxml.etree +from mock import Mock, MagicMock, patch +from Bcfg2.Client.Tools.POSIX.Symlink import * +from Test__init import get_posix_object + +def call(*args, **kwargs): + """ the Mock call object is a fairly recent addition, but it's + very very useful, so we create our own function to create Mock + calls """ + return (args, kwargs) + +def get_symlink_object(posix=None): + if posix is None: + posix = get_posix_object() + return POSIXSymlink(posix.logger, posix.setup, posix.config) + +class TestPOSIXSymlink(unittest.TestCase): + @patch("os.readlink") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify") + def test_verify(self, mock_verify, mock_readlink): + entry = lxml.etree.Element("Path", name="/test", type="symlink", + to="/dest") + ptool = get_symlink_object() + + mock_readlink.return_value = entry.get("to") + mock_verify.return_value = False + self.assertFalse(ptool.verify(entry, [])) + mock_readlink.assert_called_with(entry.get("name")) + mock_verify.assert_called_with(ptool, entry, []) + + mock_readlink.reset_mock() + mock_verify.reset_mock() + mock_verify.return_value = True + self.assertTrue(ptool.verify(entry, [])) + mock_readlink.assert_called_with(entry.get("name")) + mock_verify.assert_called_with(ptool, entry, []) + + mock_readlink.reset_mock() + mock_verify.reset_mock() + mock_readlink.return_value = "/bogus" + self.assertFalse(ptool.verify(entry, [])) + mock_readlink.assert_called_with(entry.get("name")) + mock_verify.assert_called_with(ptool, entry, []) + + mock_readlink.reset_mock() + mock_verify.reset_mock() + mock_readlink.side_effect = OSError + self.assertFalse(ptool.verify(entry, [])) + mock_readlink.assert_called_with(entry.get("name")) + + @patch("os.symlink") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install") + @patch("Bcfg2.Client.Tools.POSIX.Symlink.POSIXSymlink._exists") + def test_install(self, mock_exists, mock_install, mock_symlink): + entry = lxml.etree.Element("Path", name="/test", type="symlink", + to="/dest") + ptool = get_symlink_object() + + mock_exists.return_value = False + mock_install.return_value = True + self.assertTrue(ptool.install(entry)) + mock_exists.assert_called_with(entry, remove=True) + mock_symlink.assert_called_with(entry.get("to"), entry.get("name")) + mock_install.assert_called_with(ptool, entry) + + mock_symlink.reset_mock() + mock_exists.reset_mock() + mock_install.reset_mock() + mock_symlink.side_effect = OSError + self.assertFalse(ptool.install(entry)) + mock_exists.assert_called_with(entry, remove=True) + mock_symlink.assert_called_with(entry.get("to"), entry.get("name")) + mock_install.assert_called_with(ptool, entry) diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py new file mode 100644 index 000000000..952bb02dd --- /dev/null +++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py @@ -0,0 +1,238 @@ +import os +import unittest +import lxml.etree +from mock import Mock, MagicMock, patch +import Bcfg2.Client.Tools +import Bcfg2.Client.Tools.POSIX + +def call(*args, **kwargs): + """ the Mock call object is a fairly recent addition, but it's + very very useful, so we create our own function to create Mock + calls """ + return (args, kwargs) + +def get_config(entries): + config = lxml.etree.Element("Configuration") + bundle = lxml.etree.SubElement(config, "Bundle", name="test") + bundle.extend(entries) + return config + +def get_posix_object(logger=None, setup=None, config=None): + if config is None: + config = lxml.etree.Element("Configuration") + if not logger: + def print_msg(msg): + print(msg) + logger = Mock() + logger.error = Mock(side_effect=print_msg) + logger.warning = Mock(side_effect=print_msg) + logger.info = Mock(side_effect=print_msg) + logger.debug = Mock(side_effect=print_msg) + if not setup: + setup = MagicMock() + return Bcfg2.Client.Tools.POSIX.POSIX(logger, setup, config) + + +class TestPOSIX(unittest.TestCase): + def test__init(self): + entries = [lxml.etree.Element("Path", name="test", type="file")] + p = get_posix_object(config=get_config(entries)) + self.assertIsInstance(p, Bcfg2.Client.Tools.Tool) + self.assertIsInstance(p, Bcfg2.Client.Tools.POSIX.POSIX) + self.assertIn('Path', p.__req__) + self.assertGreater(len(p.__req__['Path']), 0) + self.assertGreater(len(p.__handles__), 0) + self.assertItemsEqual(p.handled, entries) + + @patch("Bcfg2.Client.Tools.Tool.canVerify") + def test_canVerify(self, mock_canVerify): + entry = lxml.etree.Element("Path", name="test", type="file") + p = get_posix_object() + + # first, test superclass canVerify failure + mock_canVerify.return_value = False + self.assertFalse(p.canVerify(entry)) + mock_canVerify.assert_called_with(p, entry) + + # next, test fully_specified failure + p.logger.error.reset_mock() + mock_canVerify.reset_mock() + mock_canVerify.return_value = True + mock_fully_spec = Mock() + mock_fully_spec.return_value = False + p._handlers[entry.get("type")].fully_specified = mock_fully_spec + self.assertFalse(p.canVerify(entry)) + mock_canVerify.assert_called_with(p, entry) + mock_fully_spec.assert_called_with(entry) + self.assertTrue(p.logger.error.called) + + # finally, test success + p.logger.error.reset_mock() + mock_canVerify.reset_mock() + mock_fully_spec.reset_mock() + mock_fully_spec.return_value = True + self.assertTrue(p.canVerify(entry)) + mock_canVerify.assert_called_with(p, entry) + mock_fully_spec.assert_called_with(entry) + self.assertFalse(p.logger.error.called) + + @patch("Bcfg2.Client.Tools.Tool.canInstall") + def test_canInstall(self, mock_canInstall): + entry = lxml.etree.Element("Path", name="test", type="file") + p = get_posix_object() + + # first, test superclass canInstall failure + mock_canInstall.return_value = False + self.assertFalse(p.canInstall(entry)) + mock_canInstall.assert_called_with(p, entry) + + # next, test fully_specified failure + p.logger.error.reset_mock() + mock_canInstall.reset_mock() + mock_canInstall.return_value = True + mock_fully_spec = Mock() + mock_fully_spec.return_value = False + p._handlers[entry.get("type")].fully_specified = mock_fully_spec + self.assertFalse(p.canInstall(entry)) + mock_canInstall.assert_called_with(p, entry) + mock_fully_spec.assert_called_with(entry) + self.assertTrue(p.logger.error.called) + + # finally, test success + p.logger.error.reset_mock() + mock_canInstall.reset_mock() + mock_fully_spec.reset_mock() + mock_fully_spec.return_value = True + self.assertTrue(p.canInstall(entry)) + mock_canInstall.assert_called_with(p, entry) + mock_fully_spec.assert_called_with(entry) + self.assertFalse(p.logger.error.called) + + def test_InstallPath(self): + entry = lxml.etree.Element("Path", name="test", type="file") + p = get_posix_object() + + mock_install = Mock() + mock_install.return_value = True + p._handlers[entry.get("type")].install = mock_install + self.assertTrue(p.InstallPath(entry)) + mock_install.assert_called_with(entry) + + def test_VerifyPath(self): + entry = lxml.etree.Element("Path", name="test", type="file") + modlist = [] + p = get_posix_object() + + mock_verify = Mock() + mock_verify.return_value = True + p._handlers[entry.get("type")].verify = mock_verify + self.assertTrue(p.VerifyPath(entry, modlist)) + mock_verify.assert_called_with(entry, modlist) + + mock_verify.reset_mock() + mock_verify.return_value = False + p.setup.__getitem__.return_value = True + self.assertFalse(p.VerifyPath(entry, modlist)) + self.assertIsNotNone(entry.get('qtext')) + + @patch('os.remove') + def test_prune_old_backups(self, mock_remove): + entry = lxml.etree.Element("Path", name="/etc/foo", type="file") + setup = dict(ppath='/', max_copies=5, paranoid=True) + posix = get_posix_object(setup=setup) + + remove = ["_etc_foo_2012-07-20T04:13:22.364989", + "_etc_foo_2012-07-31T04:13:23.894958", + "_etc_foo_2012-07-17T04:13:22.493316",] + keep = ["_etc_foo_bar_2011-08-07T04:13:22.519978", + "_etc_foo_2012-08-04T04:13:22.519978", + "_etc_Foo_2011-08-07T04:13:22.519978", + "_etc_foo_2012-08-06T04:13:22.519978", + "_etc_foo_2012-08-03T04:13:22.191895", + "_etc_test_2011-08-07T04:13:22.519978", + "_etc_foo_2012-08-07T04:13:22.519978",] + + with patch('os.listdir') as mock_listdir: + mock_listdir.side_effect = OSError + posix._prune_old_backups(entry) + self.assertTrue(posix.logger.error.called) + self.assertFalse(mock_remove.called) + mock_listdir.assert_called_with(setup['ppath']) + + mock_listdir.reset_mock() + mock_remove.reset_mock() + mock_listdir.side_effect = None + mock_listdir.return_value = keep + remove + + posix._prune_old_backups(entry) + mock_listdir.assert_called_with(setup['ppath']) + self.assertItemsEqual(mock_remove.call_args_list, + [call(os.path.join(setup['ppath'], p)) + for p in remove]) + + mock_listdir.reset_mock() + mock_remove.reset_mock() + mock_remove.side_effect = OSError + posix.logger.error.reset_mock() + # test to ensure that we call os.remove() for all files that + # need to be removed even if we get an error + posix._prune_old_backups(entry) + mock_listdir.assert_called_with(setup['ppath']) + self.assertItemsEqual(mock_remove.call_args_list, + [call(os.path.join(setup['ppath'], p)) + for p in remove]) + self.assertTrue(posix.logger.error.called) + + @patch("shutil.copy") + @patch("Bcfg2.Client.Tools.POSIX.POSIX._prune_old_backups") + def test_paranoid_backup(self, mock_prune, mock_copy): + entry = lxml.etree.Element("Path", name="/etc/foo", type="file") + setup = dict(ppath='/', max_copies=5, paranoid=False) + posix = get_posix_object(setup=setup) + + # paranoid false globally + posix._paranoid_backup(entry) + self.assertFalse(mock_prune.called) + self.assertFalse(mock_copy.called) + + # paranoid false on the entry + mock_prune.reset_mock() + setup['paranoid'] = True + posix = get_posix_object(setup=setup) + posix._paranoid_backup(entry) + self.assertFalse(mock_prune.called) + self.assertFalse(mock_copy.called) + + # entry does not exist on filesystem + mock_prune.reset_mock() + entry.set("paranoid", "true") + entry.set("current_exists", "false") + posix._paranoid_backup(entry) + self.assertFalse(mock_prune.called) + self.assertFalse(mock_copy.called) + + with patch("os.path.isdir") as mock_isdir: + # entry is a directory on the filesystem + mock_prune.reset_mock() + entry.set("current_exists", "true") + mock_isdir.return_value = True + posix._paranoid_backup(entry) + self.assertFalse(mock_prune.called) + self.assertFalse(mock_copy.called) + mock_isdir.assert_called_with(entry.get("name")) + + # test the actual backup now + mock_prune.reset_mock() + mock_isdir.return_value = False + posix._paranoid_backup(entry) + mock_isdir.assert_called_with(entry.get("name")) + mock_prune.assert_called_with(entry) + # it's basically impossible to test the shutil.copy() call + # exactly because the destination includes microseconds, + # so we just test it good enough + self.assertEqual(mock_copy.call_args[0][0], + entry.get("name")) + bkupnam = os.path.join(setup['ppath'], + entry.get('name').replace('/', '_')) + '_' + self.assertEqual(bkupnam, + mock_copy.call_args[0][1][:len(bkupnam)]) diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py new file mode 100644 index 000000000..68007ca8e --- /dev/null +++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py @@ -0,0 +1,966 @@ +import os +import copy +import stat +import unittest +import lxml.etree +from mock import Mock, MagicMock, patch +import Bcfg2.Client.Tools +from Bcfg2.Client.Tools.POSIX.base import * +from Test__init import get_posix_object + +try: + import selinux + has_selinux = True +except ImportError: + has_selinux = False + +try: + import posix1e + has_acls = True +except ImportError: + has_acls = False + +def call(*args, **kwargs): + """ the Mock call object is a fairly recent addition, but it's + very very useful, so we create our own function to create Mock + calls """ + return (args, kwargs) + +def get_posixtool_object(posix=None): + if posix is None: + posix = get_posix_object() + return POSIXTool(posix.logger, posix.setup, posix.config) + +class TestPOSIXTool(unittest.TestCase): + def test_fully_specified(self): + # fully_specified should do no checking on the abstract + # POSIXTool object + ptool = get_posixtool_object() + self.assertTrue(ptool.fully_specified(Mock())) + + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._verify_metadata") + def test_verify(self, mock_verify): + entry = lxml.etree.Element("Path", name="/test", type="file") + ptool = get_posixtool_object() + with patch('os.stat') as mock_stat, patch('os.walk') as mock_walk: + mock_stat.return_value = MagicMock() + + mock_verify.return_value = False + self.assertFalse(ptool.verify(entry, [])) + mock_verify.assert_called_with(entry) + + mock_verify.reset_mock() + mock_verify.return_value = True + self.assertTrue(ptool.verify(entry, [])) + mock_verify.assert_called_with(entry) + + mock_verify.reset_mock() + entry.set("recursive", "true") + walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]), + ("/dir1", ["dir3"], []), + ("/dir2", [], ["file3", "file4"])] + mock_walk.return_value = walk_rv + self.assertTrue(ptool.verify(entry, [])) + mock_walk.assert_called_with(entry.get("name")) + all_verifies = [call(entry)] + for root, dirs, files in walk_rv: + all_verifies.extend([call(entry, path=os.path.join(root, p)) + for p in dirs + files]) + self.assertItemsEqual(mock_verify.call_args_list, all_verifies) + + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._set_perms") + def test_install(self, mock_set_perms): + entry = lxml.etree.Element("Path", name="/test", type="file") + ptool = get_posixtool_object() + + mock_set_perms.return_value = True + self.assertTrue(ptool.install(entry)) + mock_set_perms.assert_called_with(entry) + + mock_set_perms.reset_mock() + entry.set("recursive", "true") + with patch('os.walk') as mock_walk: + walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]), + ("/dir1", ["dir3"], []), + ("/dir2", [], ["file3", "file4"])] + mock_walk.return_value = walk_rv + + mock_set_perms.return_value = True + self.assertTrue(ptool.install(entry)) + mock_walk.assert_called_with(entry.get("name")) + all_set_perms = [call(entry)] + for root, dirs, files in walk_rv: + all_set_perms.extend([call(entry, + path=os.path.join(root, p)) + for p in dirs + files]) + self.assertItemsEqual(mock_set_perms.call_args_list, + all_set_perms) + + mock_walk.reset_mock() + mock_set_perms.reset_mock() + + def set_perms_rv(entry, path=None): + if path == '/dir2/file3': + return False + else: + return True + mock_set_perms.side_effect = set_perms_rv + + self.assertFalse(ptool.install(entry)) + mock_walk.assert_called_with(entry.get("name")) + self.assertItemsEqual(mock_set_perms.call_args_list, + all_set_perms) + + @patch("os.unlink") + @patch("shutil.rmtree") + def test_exists(self, mock_rmtree, mock_unlink): + entry = lxml.etree.Element("Path", name="/etc/foo", type="file") + ptool = get_posixtool_object() + with patch('os.lstat') as mock_lstat, \ + patch("os.path.isdir") as mock_isdir: + mock_lstat.side_effect = OSError + self.assertFalse(ptool._exists(entry)) + mock_lstat.assert_called_with(entry.get('name')) + self.assertFalse(mock_unlink.called) + + mock_lstat.reset_mock() + mock_unlink.reset_mock() + rv = MagicMock() + mock_lstat.return_value = rv + mock_lstat.side_effect = None + self.assertEqual(ptool._exists(entry), rv) + mock_lstat.assert_called_with(entry.get('name')) + self.assertFalse(mock_unlink.called) + + mock_lstat.reset_mock() + mock_unlink.reset_mock() + mock_isdir.return_value = False + self.assertFalse(ptool._exists(entry, remove=True)) + mock_isdir.assert_called_with(entry.get('name')) + mock_lstat.assert_called_with(entry.get('name')) + mock_unlink.assert_called_with(entry.get('name')) + self.assertFalse(mock_rmtree.called) + + mock_lstat.reset_mock() + mock_isdir.reset_mock() + mock_unlink.reset_mock() + mock_rmtree.reset_mock() + mock_isdir.return_value = True + self.assertFalse(ptool._exists(entry, remove=True)) + mock_isdir.assert_called_with(entry.get('name')) + mock_lstat.assert_called_with(entry.get('name')) + mock_rmtree.assert_called_with(entry.get('name')) + self.assertFalse(mock_unlink.called) + + mock_isdir.reset_mock() + mock_lstat.reset_mock() + mock_unlink.reset_mock() + mock_rmtree.reset_mock() + mock_rmtree.side_effect = OSError + self.assertEqual(ptool._exists(entry, remove=True), rv) + mock_isdir.assert_called_with(entry.get('name')) + mock_lstat.assert_called_with(entry.get('name')) + mock_rmtree.assert_called_with(entry.get('name')) + self.assertFalse(mock_unlink.called) + + @patch("os.chown") + @patch("os.chmod") + @patch("os.utime") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_entry_uid") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_entry_gid") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._set_acls") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._set_secontext") + def test_set_perms(self, mock_set_secontext, mock_set_acls, mock_norm_gid, + mock_norm_uid, mock_utime, mock_chmod, mock_chown): + ptool = get_posixtool_object() + + def reset(): + mock_set_secontext.reset_mock() + mock_set_acls.reset_mock() + mock_norm_gid.reset_mock() + mock_norm_uid.reset_mock() + mock_chmod.reset_mock() + mock_chown.reset_mock() + mock_utime.reset_mock() + + entry = lxml.etree.Element("Path", name="/etc/foo", to="/etc/bar", + type="symlink") + mock_set_acls.return_value = True + mock_set_secontext.return_value = True + self.assertTrue(ptool._set_perms(entry)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + entry = lxml.etree.Element("Path", name="/etc/foo", owner="owner", + group="group", perms="644", type="file") + mock_norm_uid.return_value = 10 + mock_norm_gid.return_value = 100 + + reset() + self.assertTrue(ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 10, 100) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8)) + self.assertFalse(mock_utime.called) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + reset() + mtime = 1344459042 + entry.set("mtime", str(mtime)) + self.assertTrue(ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 10, 100) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8)) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + reset() + self.assertTrue(ptool._set_perms(entry, path='/etc/bar')) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with('/etc/bar', 10, 100) + mock_chmod.assert_called_with('/etc/bar', int(entry.get("perms"), 8)) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path='/etc/bar') + mock_set_acls.assert_called_with(entry, path='/etc/bar') + + # test dev_type modification of perms, failure of chown + reset() + def chown_rv(path, owner, group): + if owner == 0 and group == 0: + return True + else: + raise KeyError + os.chown.side_effect = chown_rv + entry.set("type", "device") + entry.set("dev_type", device_map.keys()[0]) + self.assertFalse(ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 0, 0) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8) | device_map.values()[0]) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + # test failure of chmod + reset() + os.chown.side_effect = None + os.chmod.side_effect = OSError + entry.set("type", "file") + del entry.attrib["dev_type"] + self.assertFalse(ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 10, 100) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8)) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + # test that even when everything fails, we try to do it all. + # e.g., when chmod fails, we still try to apply acls, set + # selinux context, etc. + reset() + os.chown.side_effect = OSError + os.utime.side_effect = OSError + mock_set_acls.return_value = False + mock_set_secontext.return_value = False + self.assertFalse(ptool._set_perms(entry)) + mock_norm_uid.assert_called_with(entry) + mock_norm_gid.assert_called_with(entry) + mock_chown.assert_called_with(entry.get("name"), 10, 100) + mock_chmod.assert_called_with(entry.get("name"), + int(entry.get("perms"), 8)) + mock_utime.assert_called_with(entry.get("name"), (mtime, mtime)) + mock_set_secontext.assert_called_with(entry, path=entry.get("name")) + mock_set_acls.assert_called_with(entry, path=entry.get("name")) + + @unittest.skipUnless(has_acls, "ACLS not found, skipping") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_uid") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_gid") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._list_entry_acls") + def test_set_acls(self, mock_list_entry_acls, mock_norm_gid, mock_norm_uid): + entry = lxml.etree.Element("Path", name="/etc/foo", type="file") + ptool = get_posixtool_object() + + # disable acls for the initial test + Bcfg2.Client.Tools.POSIX.base.has_acls = False + self.assertTrue(ptool._set_acls(entry)) + Bcfg2.Client.Tools.POSIX.base.has_acls = True + + # build a set of file ACLs to return from posix1e.ACL(file=...) + file_acls = [] + acl = Mock() + acl.tag_type = posix1e.ACL_USER + acl.name = "remove" + file_acls.append(acl) + acl = Mock() + acl.tag_type = posix1e.ACL_GROUP + acl.name = "remove" + file_acls.append(acl) + acl = Mock() + acl.tag_type = posix1e.ACL_MASK + acl.name = "keep" + file_acls.append(acl) + remove_acls = [a for a in file_acls if a.name == "remove"] + + # build a set of ACLs listed on the entry as returned by + # _list_entry_acls() + entry_acls = {("default", posix1e.ACL_USER, "user"): 7, + ("access", posix1e.ACL_GROUP, "group"): 5} + mock_list_entry_acls.return_value = entry_acls + mock_norm_uid.return_value = 10 + mock_norm_gid.return_value = 100 + + with patch("posix1e.ACL") as mock_ACL, \ + patch("posix1e.Entry") as mock_Entry, \ + patch("os.path.isdir") as mock_isdir: + # set up the unreasonably complex return value for + # posix1e.ACL(), which has three separate uses + fileacl_rv = MagicMock() + fileacl_rv.valid.return_value = True + fileacl_rv.__iter__.return_value = iter(file_acls) + filedef_rv = MagicMock() + filedef_rv.valid.return_value = True + filedef_rv.__iter__.return_value = iter(file_acls) + acl_rv = MagicMock() + def mock_acl_rv(file=None, filedef=None, acl=None): + if file: + return fileacl_rv + elif filedef: + return filedef_rv + elif acl: + return acl_rv + + # set up the equally unreasonably complex return value for + # posix1e.Entry, which returns a new entry and adds it to + # an ACL, so we have to track the Mock objects it returns. + # why can't they just have an acl.add_entry() method?!? + acl_entries = [] + def mock_entry_rv(acl): + rv = MagicMock() + rv.acl = acl + rv.permset = set() + acl_entries.append(rv) + return rv + mock_Entry.side_effect = mock_entry_rv + + def reset(): + mock_isdir.reset_mock() + mock_ACL.reset_mock() + mock_Entry.reset_mock() + fileacl_rv.reset_mock() + + # test fs mounted noacl + mock_ACL.side_effect = IOError(95, "Operation not permitted") + self.assertFalse(ptool._set_acls(entry)) + + # test other error + reset() + mock_ACL.side_effect = IOError + self.assertFalse(ptool._set_acls(entry)) + + reset() + mock_ACL.side_effect = mock_acl_rv + mock_isdir.return_value = True + self.assertTrue(ptool._set_acls(entry)) + self.assertItemsEqual(mock_ACL.call_args_list, + [call(file=entry.get("name")), + call(filedef=entry.get("name"))]) + self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list, + [call(a) for a in remove_acls]) + self.assertItemsEqual(filedef_rv.delete_entry.call_args_list, + [call(a) for a in remove_acls]) + mock_list_entry_acls.assert_called_with(entry) + mock_norm_uid.assert_called_with("user") + mock_norm_gid.assert_called_with("group") + fileacl_rv.calc_mask.assert_any_call() + fileacl_rv.applyto.assert_called_with(entry.get("name"), + posix1e.ACL_TYPE_ACCESS) + filedef_rv.calc_mask.assert_any_call() + filedef_rv.applyto.assert_called_with(entry.get("name"), + posix1e.ACL_TYPE_DEFAULT) + + # build tuples of the Entry objects that were added to acl + # and defaacl so they're easier to compare for equality + added_acls = [] + for acl in acl_entries: + added_acls.append((acl.acl, acl.tag_type, acl.qualifier, + sum(acl.permset))) + self.assertItemsEqual(added_acls, + [(filedef_rv, posix1e.ACL_USER, 10, 7), + (fileacl_rv, posix1e.ACL_GROUP, 100, 5)]) + + reset() + # have to reassign these because they're iterators, and + # they've already been iterated over once + fileacl_rv.__iter__.return_value = iter(file_acls) + filedef_rv.__iter__.return_value = iter(file_acls) + mock_list_entry_acls.reset_mock() + mock_norm_uid.reset_mock() + mock_norm_gid.reset_mock() + mock_isdir.return_value = False + acl_entries = [] + self.assertTrue(ptool._set_acls(entry, path="/bin/bar")) + mock_ACL.assert_called_with(file="/bin/bar") + self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list, + [call(a) for a in remove_acls]) + mock_list_entry_acls.assert_called_with(entry) + mock_norm_gid.assert_called_with("group") + fileacl_rv.calc_mask.assert_any_call() + fileacl_rv.applyto.assert_called_with("/bin/bar", + posix1e.ACL_TYPE_ACCESS) + + added_acls = [] + for acl in acl_entries: + added_acls.append((acl.acl, acl.tag_type, acl.qualifier, + sum(acl.permset))) + self.assertItemsEqual(added_acls, + [(fileacl_rv, posix1e.ACL_GROUP, 100, 5)]) + + @unittest.skipUnless(has_selinux, "SELinux not found, skipping") + def test_set_secontext(self): + entry = lxml.etree.Element("Path", name="/etc/foo", type="file") + ptool = get_posixtool_object() + + # disable selinux for the initial test + Bcfg2.Client.Tools.POSIX.base.has_selinux = False + self.assertTrue(ptool._set_secontext(entry)) + Bcfg2.Client.Tools.POSIX.base.has_selinux = True + + with patch("selinux.restorecon") as mock_restorecon, \ + patch("selinux.lsetfilecon") as mock_lsetfilecon: + # no context given + self.assertTrue(ptool._set_secontext(entry)) + self.assertFalse(mock_restorecon.called) + self.assertFalse(mock_lsetfilecon.called) + + mock_restorecon.reset_mock() + mock_lsetfilecon.reset_mock() + entry.set("secontext", "__default__") + self.assertTrue(ptool._set_secontext(entry)) + mock_restorecon.assert_called_with(entry.get("name")) + self.assertFalse(mock_lsetfilecon.called) + + mock_restorecon.reset_mock() + mock_lsetfilecon.reset_mock() + mock_lsetfilecon.return_value = 0 + entry.set("secontext", "foo_t") + self.assertTrue(ptool._set_secontext(entry)) + self.assertFalse(mock_restorecon.called) + mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t") + + mock_restorecon.reset_mock() + mock_lsetfilecon.reset_mock() + mock_lsetfilecon.return_value = 1 + self.assertFalse(ptool._set_secontext(entry)) + self.assertFalse(mock_restorecon.called) + mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t") + + @patch("grp.getgrnam") + def test_norm_gid(self, mock_getgrnam): + ptool = get_posixtool_object() + self.assertEqual(5, ptool._norm_gid("5")) + self.assertFalse(mock_getgrnam.called) + + mock_getgrnam.reset_mock() + mock_getgrnam.return_value = ("group", "x", 5, []) + self.assertEqual(5, ptool._norm_gid("group")) + mock_getgrnam.assert_called_with("group") + + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_gid") + def test_norm_entry_gid(self, mock_norm_gid): + entry = lxml.etree.Element("Path", name="/test", type="file", + group="group", owner="user") + ptool = get_posixtool_object() + mock_norm_gid.return_value = 10 + self.assertEqual(10, ptool._norm_entry_gid(entry)) + mock_norm_gid.assert_called_with(entry.get("group")) + + mock_norm_gid.reset_mock() + mock_norm_gid.side_effect = KeyError + self.assertEqual(0, ptool._norm_entry_gid(entry)) + mock_norm_gid.assert_called_with(entry.get("group")) + + @patch("pwd.getpwnam") + def test_norm_uid(self, mock_getpwnam): + ptool = get_posixtool_object() + self.assertEqual(5, ptool._norm_uid("5")) + self.assertFalse(mock_getpwnam.called) + + mock_getpwnam.reset_mock() + mock_getpwnam.return_value = ("user", "x", 5, 5, "User", "/home/user", + "/bin/zsh") + self.assertEqual(5, ptool._norm_uid("user")) + mock_getpwnam.assert_called_with("user") + + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_uid") + def test_norm_entry_uid(self, mock_norm_uid): + entry = lxml.etree.Element("Path", name="/test", type="file", + group="group", owner="user") + ptool = get_posixtool_object() + mock_norm_uid.return_value = 10 + self.assertEqual(10, ptool._norm_entry_uid(entry)) + mock_norm_uid.assert_called_with(entry.get("owner")) + + mock_norm_uid.reset_mock() + mock_norm_uid.side_effect = KeyError + self.assertEqual(0, ptool._norm_entry_uid(entry)) + mock_norm_uid.assert_called_with(entry.get("owner")) + + def test_norm_acl_perms(self): + ptool = get_posixtool_object() + # there's basically no reasonably way to test the Permset + # object parsing feature without writing our own Mock object + # that re-implements Permset.test(). silly pylibacl won't let + # us create standalone Entry or Permset objects. + ptool = get_posixtool_object() + self.assertEqual(5, ptool._norm_acl_perms("5")) + self.assertEqual(0, ptool._norm_acl_perms("55")) + self.assertEqual(5, ptool._norm_acl_perms("rx")) + self.assertEqual(5, ptool._norm_acl_perms("r-x")) + self.assertEqual(6, ptool._norm_acl_perms("wr-")) + self.assertEqual(0, ptool._norm_acl_perms("rwrw")) + self.assertEqual(0, ptool._norm_acl_perms("-")) + self.assertEqual(0, ptool._norm_acl_perms("a")) + self.assertEqual(6, ptool._norm_acl_perms("rwa")) + self.assertEqual(4, ptool._norm_acl_perms("rr")) + + def test__gather_data(self): + path = '/test' + ptool = get_posixtool_object() + + # have to use context manager version of patch here because + # os.stat must be unpatched when we instantiate the object to + # make pkgutil.walk_packages() work + with patch('os.stat') as mock_stat: + mock_stat.side_effect = OSError + self.assertFalse(ptool._gather_data(path)[0]) + mock_stat.assert_called_with(path) + + mock_stat.reset_mock() + mock_stat.side_effect = None + # create a return value + stat_rv = MagicMock() + def stat_getitem(key): + if int(key) == stat.ST_UID: + return 0 + elif int(key) == stat.ST_GID: + return 10 + elif int(key) == stat.ST_MODE: + # return extra bits in the mode to emulate a device + # and ensure that they're stripped + return int('060660', 8) + stat_rv.__getitem__ = Mock(side_effect=stat_getitem) + mock_stat.return_value = stat_rv + + # disable selinux and acls for this call -- we test them + # separately so that we can skip those tests as appropriate + states = (Bcfg2.Client.Tools.POSIX.base.has_selinux, + Bcfg2.Client.Tools.POSIX.base.has_acls) + Bcfg2.Client.Tools.POSIX.base.has_selinux = False + Bcfg2.Client.Tools.POSIX.base.has_acls = False + self.assertEqual(ptool._gather_data(path), + (stat_rv, '0', '10', '0660', None, None)) + Bcfg2.Client.Tools.POSIX.base.has_selinux, \ + Bcfg2.Client.Tools.POSIX.base.has_acls = states + mock_stat.assert_called_with(path) + + @unittest.skipUnless(has_selinux, "SELinux not found, skipping") + def test__gather_data_selinux(self): + context = 'system_u:object_r:root_t:s0' + path = '/test' + ptool = get_posixtool_object() + with patch("selinux.getfilecon") as mock_getfilecon, \ + patch('os.stat') as mock_stat: + mock_getfilecon.return_value = [len(context) + 1, context] + mock_stat.return_value = MagicMock() + # disable acls for this call and test them separately + state = Bcfg2.Client.Tools.POSIX.base.has_acls + Bcfg2.Client.Tools.POSIX.base.has_acls = False + self.assertEqual(ptool._gather_data(path)[4], 'root_t') + Bcfg2.Client.Tools.POSIX.base.has_acls = state + mock_getfilecon.assert_called_with(path) + + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._list_file_acls") + @unittest.skipUnless(has_acls, "ACLS not found, skipping") + def test__gather_data_acls(self, mock_list_file_acls): + acls = {("default", posix1e.ACL_USER, "testuser"): "rwx", + ("access", posix1e.ACL_GROUP, "testgroup"): "rx"} + mock_list_file_acls.return_value = acls + path = '/test' + ptool = get_posixtool_object() + with patch('os.stat') as mock_stat: + mock_stat.return_value = MagicMock() + # disable selinux for this call and test it separately + state = Bcfg2.Client.Tools.POSIX.base.has_selinux + Bcfg2.Client.Tools.POSIX.base.has_selinux = False + self.assertItemsEqual(ptool._gather_data(path)[5], acls) + Bcfg2.Client.Tools.POSIX.base.has_selinux = state + mock_list_file_acls.assert_called_with(path) + + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._verify_acls") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._gather_data") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_entry_uid") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_entry_gid") + def test_verify_metadata(self, mock_norm_gid, mock_norm_uid, + mock_gather_data, mock_verify_acls): + entry = lxml.etree.Element("Path", name="/test", type="file", + group="group", owner="user", perms="664", + secontext='etc_t') + # _verify_metadata() mutates the entry, so we keep a backup so we + # can start fresh every time + orig_entry = copy.deepcopy(entry) + + ptool = get_posixtool_object() + + mock_gather_data.return_value = (False, None, None, None, None, None) + self.assertFalse(ptool._verify_metadata(entry)) + self.assertEqual(entry.get("current_exists", "").lower(), "false") + mock_gather_data.assert_called_with(entry.get("name")) + + # expected data. tuple of attr, return value index, value + expected = [('current_owner', 1, '0'), + ('current_group', 2, '10'), + ('current_perms', 3, '0664'), + ('current_secontext', 4, 'etc_t')] + mock_norm_uid.return_value = 0 + mock_norm_gid.return_value = 10 + gather_data_rv = [MagicMock(), None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + + entry = copy.deepcopy(orig_entry) + mock_gather_data.reset_mock() + mock_gather_data.return_value = tuple(gather_data_rv) + mock_norm_uid.reset_mock() + mock_norm_gid.reset_mock() + self.assertTrue(ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + + mtime = 1344430414 + entry = copy.deepcopy(orig_entry) + entry.set("mtime", str(mtime)) + mock_gather_data.reset_mock() + mock_norm_uid.reset_mock() + mock_norm_gid.reset_mock() + stat_rv = MagicMock() + stat_rv.__getitem__.return_value = mtime + gather_data_rv[0] = stat_rv + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertTrue(ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(mtime)) + + # failure modes for each checked datum. tuple of changed attr, + # return value index, new (failing) value + failures = [('current_owner', 1, '10'), + ('current_group', 2, '100'), + ('current_perms', 3, '0660')] + if has_selinux: + failures.append(('current_secontext', 4, 'root_t')) + + for fail_attr, fail_idx, fail_val in failures: + entry = copy.deepcopy(orig_entry) + entry.set("mtime", str(mtime)) + mock_gather_data.reset_mock() + mock_norm_uid.reset_mock() + mock_norm_gid.reset_mock() + gather_data_rv = [stat_rv, None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + gather_data_rv[fail_idx] = fail_val + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertFalse(ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + self.assertEqual(entry.get(fail_attr), fail_val) + for attr, idx, val in expected: + if attr != fail_attr: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(mtime)) + + # failure mode for mtime + fail_mtime = 1344431162 + entry = copy.deepcopy(orig_entry) + entry.set("mtime", str(mtime)) + mock_gather_data.reset_mock() + mock_norm_uid.reset_mock() + mock_norm_gid.reset_mock() + fail_stat_rv = MagicMock() + fail_stat_rv.__getitem__.return_value = fail_mtime + gather_data_rv = [fail_stat_rv, None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertFalse(ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, path=entry.get("name")) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(fail_mtime)) + + if has_selinux: + # test success and failure for __default__ secontext + entry = copy.deepcopy(orig_entry) + entry.set("mtime", str(mtime)) + entry.set("secontext", "__default__") + mock_gather_data.reset_mock() + mock_norm_uid.reset_mock() + mock_norm_gid.reset_mock() + with patch("selinux.matchpathcon") as mock_matchpathcon: + context1 = "system_u:object_r:etc_t:s0" + context2 = "system_u:object_r:root_t:s0" + mock_matchpathcon.return_value = [1 + len(context1), + context1] + gather_data_rv = [stat_rv, None, None, None, None, []] + for attr, idx, val in expected: + gather_data_rv[idx] = val + mock_gather_data.return_value = tuple(gather_data_rv) + self.assertTrue(ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, + path=entry.get("name")) + mock_matchpathcon.assert_called_with(entry.get("name"), 0) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(mtime)) + + entry = copy.deepcopy(orig_entry) + entry.set("mtime", str(mtime)) + entry.set("secontext", "__default__") + mock_gather_data.reset_mock() + mock_norm_uid.reset_mock() + mock_norm_gid.reset_mock() + mock_matchpathcon.return_value = [1 + len(context2), + context2] + self.assertFalse(ptool._verify_metadata(entry)) + mock_gather_data.assert_called_with(entry.get("name")) + mock_verify_acls.assert_called_with(entry, + path=entry.get("name")) + mock_matchpathcon.assert_called_with(entry.get("name"), 0) + self.assertEqual(entry.get("current_exists", 'true'), 'true') + for attr, idx, val in expected: + self.assertEqual(entry.get(attr), val) + self.assertEqual(entry.get("current_mtime"), str(mtime)) + + @unittest.skipUnless(has_acls, "ACLS not found, skipping") + def test_list_entry_acls(self): + entry = lxml.etree.Element("Path", name="/test", type="file") + lxml.etree.SubElement(entry, "ACL", scope="user", type="default", + user="user", perms="rwx") + lxml.etree.SubElement(entry, "ACL", scope="group", type="access", + group="group", perms="5") + ptool = get_posixtool_object() + self.assertItemsEqual(ptool._list_entry_acls(entry), + {("default", posix1e.ACL_USER, "user"): 7, + ("access", posix1e.ACL_GROUP, "group"): 5}) + + @unittest.skipUnless(has_acls, "ACLS not found, skipping") + @patch("pwd.getpwuid") + @patch("grp.getgrgid") + def test_list_file_acls(self, mock_getgrgid, mock_getpwuid): + path = '/test' + ptool = get_posixtool_object() + with patch("posix1e.ACL") as mock_ACL, \ + patch("os.path.isdir") as mock_isdir: + # build a set of file ACLs to return from posix1e.ACL(file=...) + file_acls = [] + acl = Mock() + acl.tag_type = posix1e.ACL_USER + acl.qualifier = 10 + # yes, this is a bogus permset. thanks to _norm_acl_perms + # it works and is easier than many of the alternatives. + acl.permset = 'rwx' + file_acls.append(acl) + acl = Mock() + acl.tag_type = posix1e.ACL_GROUP + acl.qualifier = 100 + acl.permset = 'rx' + file_acls.append(acl) + acl = Mock() + acl.tag_type = posix1e.ACL_MASK + file_acls.append(acl) + acls = {("access", posix1e.ACL_USER, "user"): 7, + ("access", posix1e.ACL_GROUP, "group"): 5} + + # set up the unreasonably complex return value for + # posix1e.ACL(), which has two separate uses + fileacl_rv = MagicMock() + fileacl_rv.valid.return_value = True + fileacl_rv.__iter__.return_value = iter(file_acls) + filedef_rv = MagicMock() + filedef_rv.valid.return_value = True + filedef_rv.__iter__.return_value = iter(file_acls) + def mock_acl_rv(file=None, filedef=None): + if file: + return fileacl_rv + elif filedef: + return filedef_rv + # other return values + mock_isdir.return_value = False + mock_getgrgid.return_value = ("group", "x", 5, []) + mock_getpwuid.return_value = ("user", "x", 5, 5, "User", + "/home/user", "/bin/zsh") + + def reset(): + mock_isdir.reset_mock() + mock_getgrgid.reset_mock() + mock_getpwuid.reset_mock() + mock_ACL.reset_mock() + + mock_ACL.side_effect = IOError(95, "Operation not supported") + self.assertItemsEqual(ptool._list_file_acls(path), dict()) + + reset() + mock_ACL.side_effect = IOError + self.assertItemsEqual(ptool._list_file_acls(path), dict()) + + reset() + mock_ACL.side_effect = mock_acl_rv + self.assertItemsEqual(ptool._list_file_acls(path), acls) + mock_isdir.assert_called_with(path) + mock_getgrgid.assert_called_with(100) + mock_getpwuid.assert_called_with(10) + mock_ACL.assert_called_with(file=path) + + reset() + mock_isdir.return_value = True + fileacl_rv.__iter__.return_value = iter(file_acls) + filedef_rv.__iter__.return_value = iter(file_acls) + + defacls = acls + for akey, perms in acls.items(): + defacls[('default', akey[1], akey[2])] = perms + self.assertItemsEqual(ptool._list_file_acls(path), defacls) + mock_isdir.assert_called_with(path) + self.assertItemsEqual(mock_getgrgid.call_args_list, + [call(100), call(100)]) + self.assertItemsEqual(mock_getpwuid.call_args_list, + [call(10), call(10)]) + self.assertItemsEqual(mock_ACL.call_args_list, + [call(file=path), call(filedef=path)]) + + @unittest.skipUnless(has_acls, "ACLS not found, skipping") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._list_file_acls") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._list_entry_acls") + def test_verify_acls(self, mock_list_entry_acls, mock_list_file_acls): + entry = lxml.etree.Element("Path", name="/test", type="file") + ptool = get_posixtool_object() + # we can't test to make sure that errors get properly sorted + # into (missing, extra, wrong) without refactoring the + # _verify_acls code, and I don't feel like doing that, so eff + # it. let's just test to make sure that failures are + # identified at all for now. + + acls = {("access", posix1e.ACL_USER, "user"): 7, + ("default", posix1e.ACL_GROUP, "group"): 5} + extra_acls = copy.deepcopy(acls) + extra_acls[("access", posix1e.ACL_USER, "user2")] = 4 + + mock_list_entry_acls.return_value = acls + mock_list_file_acls.return_value = acls + self.assertTrue(ptool._verify_acls(entry)) + mock_list_entry_acls.assert_called_with(entry) + mock_list_file_acls.assert_called_with(entry.get("name")) + + # test missing + mock_list_entry_acls.reset_mock() + mock_list_file_acls.reset_mock() + mock_list_file_acls.return_value = extra_acls + self.assertFalse(ptool._verify_acls(entry)) + mock_list_entry_acls.assert_called_with(entry) + mock_list_file_acls.assert_called_with(entry.get("name")) + + # test extra + mock_list_entry_acls.reset_mock() + mock_list_file_acls.reset_mock() + mock_list_entry_acls.return_value = extra_acls + mock_list_file_acls.return_value = acls + self.assertFalse(ptool._verify_acls(entry)) + mock_list_entry_acls.assert_called_with(entry) + mock_list_file_acls.assert_called_with(entry.get("name")) + + # test wrong + wrong_acls = copy.deepcopy(extra_acls) + wrong_acls[("access", posix1e.ACL_USER, "user2")] = 5 + mock_list_entry_acls.reset_mock() + mock_list_file_acls.reset_mock() + mock_list_entry_acls.return_value = extra_acls + mock_list_file_acls.return_value = wrong_acls + self.assertFalse(ptool._verify_acls(entry)) + mock_list_entry_acls.assert_called_with(entry) + mock_list_file_acls.assert_called_with(entry.get("name")) + + @patch("os.makedirs") + @patch("os.path.exists") + @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._set_perms") + def test_makedirs(self, mock_set_perms, mock_exists, mock_makedirs): + entry = lxml.etree.Element("Path", name="/test/foo/bar", + type="directory") + + def reset(): + mock_exists.reset_mock() + mock_set_perms.reset_mock() + mock_makedirs.reset_mock() + + ptool = get_posixtool_object() + mock_set_perms.return_value = True + def path_exists_rv(path): + if path == "/test": + return True + else: + return False + mock_exists.side_effect = path_exists_rv + self.assertTrue(ptool._makedirs(entry)) + self.assertItemsEqual(mock_exists.call_args_list, + [call("/test"), call("/test/foo"), + call("/test/foo/bar")]) + self.assertItemsEqual(mock_set_perms.call_args_list, + [call(entry, path="/test/foo"), + call(entry, path="/test/foo/bar")]) + mock_makedirs.assert_called_with(entry.get("name")) + + reset() + mock_makedirs.side_effect = OSError + self.assertFalse(ptool._makedirs(entry)) + self.assertItemsEqual(mock_set_perms.call_args_list, + [call(entry, path="/test/foo"), + call(entry, path="/test/foo/bar")]) + + reset() + mock_makedirs.side_effect = None + def set_perms_rv(entry, path=None): + if path == '/test/foo': + return False + else: + return True + mock_set_perms.side_effect = set_perms_rv + self.assertFalse(ptool._makedirs(entry)) + self.assertItemsEqual(mock_exists.call_args_list, + [call("/test"), call("/test/foo"), + call("/test/foo/bar")]) + self.assertItemsEqual(mock_set_perms.call_args_list, + [call(entry, path="/test/foo"), + call(entry, path="/test/foo/bar")]) + mock_makedirs.assert_called_with(entry.get("name")) |