summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2012-08-15 09:06:43 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2012-08-15 09:06:43 -0400
commitb862090945322d5ba4b42e180bba92afb860df21 (patch)
tree1c482adfa9561bad14d82fc442f8f319b33b1d4f
parent7890fd0aa5331541c71b893c313553765ca1628e (diff)
downloadbcfg2-b862090945322d5ba4b42e180bba92afb860df21.tar.gz
bcfg2-b862090945322d5ba4b42e180bba92afb860df21.tar.bz2
bcfg2-b862090945322d5ba4b42e180bba92afb860df21.zip
POSIX:
refactored POSIX tool into multiple files to make it more manageable Added unit tests for POSIX tool and sub-tools fixed ACL handling for filesystems mounted noacl
-rw-r--r--schemas/types.xsd1
-rw-r--r--src/lib/Bcfg2/Client/Frame.py2
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX.py1239
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX/Device.py62
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX/Directory.py86
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX/File.py219
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX/Hardlink.py39
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX/Nonexistent.py41
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX/Permissions.py7
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX/Symlink.py42
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX/__init__.py147
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX/base.py639
-rw-r--r--src/lib/Bcfg2/Client/Tools/__init__.py49
-rw-r--r--src/lib/Bcfg2/Options.py2
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py139
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py154
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py318
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py80
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py88
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py21
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py76
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py238
-rw-r--r--testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py966
23 files changed, 3396 insertions, 1259 deletions
diff --git a/schemas/types.xsd b/schemas/types.xsd
index 83cc2c9ee..edbc8ad37 100644
--- a/schemas/types.xsd
+++ b/schemas/types.xsd
@@ -140,7 +140,6 @@
<xsd:attribute type="DeviceTypeEnum" name="dev_type"/>
<xsd:attribute type="xsd:integer" name="major"/>
<xsd:attribute type="xsd:integer" name="minor"/>
- <xsd:attribute type="xsd:string" name="mode"/>
<xsd:attribute type="xsd:string" name="perms"/>
<xsd:attribute type="xsd:string" name="owner"/>
<xsd:attribute type="xsd:string" name="group"/>
diff --git a/src/lib/Bcfg2/Client/Frame.py b/src/lib/Bcfg2/Client/Frame.py
index b456738d0..bfdc90d38 100644
--- a/src/lib/Bcfg2/Client/Frame.py
+++ b/src/lib/Bcfg2/Client/Frame.py
@@ -84,7 +84,7 @@ class Frame:
self.whitelist = []
self.blacklist = []
self.removal = []
- self.logger = logging.getLogger("Bcfg2.Client.Frame")
+ self.logger = logging.getLogger(__name__)
for driver in drivers[:]:
if driver not in Bcfg2.Client.Tools.drivers and \
isinstance(driver, str):
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX.py b/src/lib/Bcfg2/Client/Tools/POSIX.py
deleted file mode 100644
index 64ea1b3e8..000000000
--- a/src/lib/Bcfg2/Client/Tools/POSIX.py
+++ /dev/null
@@ -1,1239 +0,0 @@
-"""All POSIX Type client support for Bcfg2."""
-
-import binascii
-from datetime import datetime
-import difflib
-import errno
-import grp
-import logging
-import os
-import pwd
-import shutil
-import stat
-import sys
-import time
-# py3k compatibility
-if sys.hexversion >= 0x03000000:
- unicode = str
-
-import Bcfg2.Client.Tools
-import Bcfg2.Options
-from Bcfg2.Client import XML
-
-log = logging.getLogger(__name__)
-
-try:
- import selinux
- has_selinux = True
-except ImportError:
- has_selinux = False
-
-try:
- import posix1e
- has_acls = True
-except ImportError:
- has_acls = False
-
-
-# map between dev_type attribute and stat constants
-device_map = {'block': stat.S_IFBLK,
- 'char': stat.S_IFCHR,
- 'fifo': stat.S_IFIFO}
-
-# map between permissions characters and numeric ACL constants
-acl_map = dict(r=posix1e.ACL_READ,
- w=posix1e.ACL_WRITE,
- x=posix1e.ACL_EXECUTE)
-
-
-class POSIX(Bcfg2.Client.Tools.Tool):
- """POSIX File support code."""
- name = 'POSIX'
- __handles__ = [('Path', 'device'),
- ('Path', 'directory'),
- ('Path', 'file'),
- ('Path', 'hardlink'),
- ('Path', 'nonexistent'),
- ('Path', 'permissions'),
- ('Path', 'symlink')]
- __req__ = dict(Path=dict(
- device=['name', 'dev_type', 'perms', 'owner', 'group'],
- directory=['name', 'perms', 'owner', 'group'],
- file=['name', 'perms', 'owner', 'group'],
- hardlink=['name', 'to'],
- nonexistent=['name'],
- permissions=['name', 'perms', 'owner', 'group'],
- symlink=['name', 'to']))
-
- # grab paranoid options from /etc/bcfg2.conf
- opts = {'ppath': Bcfg2.Options.PARANOID_PATH,
- 'max_copies': Bcfg2.Options.PARANOID_MAX_COPIES}
- setup = Bcfg2.Options.OptionParser(opts)
- setup.parse([])
- ppath = setup['ppath']
- max_copies = setup['max_copies']
-
- def canInstall(self, entry):
- """Check if entry is complete for installation."""
- if Bcfg2.Client.Tools.Tool.canInstall(self, entry):
- if (entry.get('type') == 'file' and
- entry.text is None and
- entry.get('empty', 'false') == 'false'):
- return False
- return True
- else:
- return False
-
- def gatherCurrentData(self, entry):
- if entry.tag == 'Path' and entry.get('type') == 'file':
- try:
- ondisk = os.stat(entry.get('name'))
- except OSError:
- entry.set('current_exists', 'false')
- self.logger.debug("%s %s does not exist" %
- (entry.tag, entry.get('name')))
- return False
- try:
- entry.set('current_owner', str(ondisk[stat.ST_UID]))
- entry.set('current_group', str(ondisk[stat.ST_GID]))
- except (OSError, KeyError):
- pass
-
- if has_selinux:
- try:
- entry.set('current_secontext',
- selinux.getfilecon(entry.get('name'))[1])
- except (OSError, KeyError):
- pass
- entry.set('perms', str(oct(ondisk[stat.ST_MODE])[-4:]))
-
- def _set_perms(self, entry, path=None):
- if path is None:
- path = entry.get("name")
-
- if (entry.get('perms') == None or
- entry.get('owner') == None or
- entry.get('group') == None):
- self.logger.error('Entry %s not completely specified. '
- 'Try running bcfg2-lint.' % entry.get('name'))
- return False
-
- rv = True
- # split this into multiple try...except blocks so that even if a
- # chown fails, the chmod can succeed -- get as close to the
- # desired state as we can
- try:
- self.logger.debug("Setting ownership of %s to %s:%s" %
- (path,
- self._norm_entry_uid(entry),
- self._norm_entry_gid(entry)))
- os.chown(path, self._norm_entry_uid(entry),
- self._norm_entry_gid(entry))
- except KeyError:
- self.logger.error('Failed to change ownership of %s' % path)
- rv = False
- os.chown(path, 0, 0)
- except OSError:
- self.logger.error('Failed to change ownership of %s' % path)
- rv = False
-
- configPerms = int(entry.get('perms'), 8)
- if entry.get('dev_type'):
- configPerms |= device_map[entry.get('dev_type')]
- try:
- self.logger.debug("Setting permissions on %s to %s" %
- (path, oct(configPerms)))
- os.chmod(path, configPerms)
- except (OSError, KeyError):
- self.logger.error('Failed to change permissions mode of %s' % path)
- rv = False
-
- recursive = entry.get("recursive", "false").lower() == "true"
- return (self._set_secontext(entry, path=path, recursive=recursive) and
- self._set_acls(entry, path=path, recursive=recursive) and
- rv)
-
- def _set_acls(self, entry, path=None, recursive=True):
- """ set POSIX ACLs on the file on disk according to the config """
- if not has_acls:
- if entry.findall("ACL"):
- self.logger.debug("ACLs listed for %s but no pylibacl library "
- "installed" % entry.get('name'))
- return True
-
- if path is None:
- path = entry.get("name")
-
- acl = posix1e.ACL(file=path)
- # clear ACLs out so we start fresh -- way easier than trying
- # to add/remove/modify ACLs
- for aclentry in acl:
- if aclentry.tag_type in [posix1e.ACL_USER, posix1e.ACL_GROUP]:
- acl.delete_entry(aclentry)
- if os.path.isdir(path):
- defacl = posix1e.ACL(filedef=path)
- if not defacl.valid():
- # when a default ACL is queried on a directory that
- # has no default ACL entries at all, you get an empty
- # ACL, which is not valid. in this circumstance, we
- # just copy the access ACL to get a base valid ACL
- # that we can add things to.
- defacl = posix1e.ACL(acl=acl)
- else:
- for aclentry in defacl:
- if aclentry.tag_type in [posix1e.ACL_USER,
- posix1e.ACL_GROUP]:
- defacl.delete_entry(aclentry)
- else:
- defacl = None
-
- for aclkey, perms in self._list_entry_acls(entry).items():
- atype, scope, qualifier = aclkey
- if atype == "default":
- if defacl is None:
- self.logger.warning("Cannot set default ACLs on "
- "non-directory %s" % path)
- continue
- entry = posix1e.Entry(defacl)
- else:
- entry = posix1e.Entry(acl)
- for perm in acl_map.values():
- if perm & perms:
- entry.permset.add(perm)
- entry.tag_type = scope
- try:
- if scope == posix1e.ACL_USER:
- scopename = "user"
- entry.qualifier = self._norm_uid(qualifier)
- elif scope == posix1e.ACL_GROUP:
- scopename = "group"
- entry.qualifier = self._norm_gid(qualifier)
- except (OSError, KeyError):
- err = sys.exc_info()[1]
- self.logger.error("Could not resolve %s %s: %s" %
- (scopename, qualifier, err))
- continue
- acl.calc_mask()
-
- def _apply_acl(acl, path, atype=posix1e.ACL_TYPE_ACCESS):
- if atype == posix1e.ACL_TYPE_ACCESS:
- atype_str = "access"
- else:
- atype_str = "default"
- if acl.valid():
- self.logger.debug("Applying %s ACL to %s:" % (atype_str, path))
- for line in str(acl).splitlines():
- self.logger.debug(" " + line)
- try:
- acl.applyto(path, atype)
- return True
- except:
- err = sys.exc_info()[1]
- self.logger.error("Failed to set ACLs on %s: %s" %
- (path, err))
- return False
- else:
- self.logger.warning("%s ACL created for %s was invalid:" %
- (atype_str.title(), path))
- for line in str(acl).splitlines():
- self.logger.warning(" " + line)
- return False
-
- rv = _apply_acl(acl, path)
- if defacl:
- defacl.calc_mask()
- rv &= _apply_acl(defacl, path, posix1e.ACL_TYPE_DEFAULT)
- if recursive:
- for root, dirs, files in os.walk(path):
- for p in dirs + files:
- rv &= _apply_acl(acl, p)
- if defacl:
- rv &= _apply_acl(defacl, p, posix1e.ACL_TYPE_DEFAULT)
- return rv
-
- def _set_secontext(self, entry, path=None, recursive=False):
- """ set the SELinux context of the file on disk according to the
- config"""
- if not has_selinux:
- return True
-
- if path is None:
- path = entry.get("name")
- context = entry.get("secontext")
- if context is None:
- # no context listed
- return True
-
- rv = True
- if context == '__default__':
- try:
- selinux.restorecon(path, recursive=recursive)
- except:
- err = sys.exc_info()[1]
- self.logger.error("Failed to restore SELinux context for %s: %s"
- % (path, err))
- rv = False
- else:
- try:
- rv &= selinux.lsetfilecon(path, context) == 0
- except:
- err = sys.exc_info()[1]
- self.logger.error("Failed to restore SELinux context for %s: %s"
- % (path, err))
- rv = False
-
- if recursive:
- for root, dirs, files in os.walk(path):
- for p in dirs + files:
- try:
- rv &= selinux.lsetfilecon(p, context) == 0
- except:
- err = sys.exc_info()[1]
- self.logger.error("Failed to restore SELinux "
- "context for %s: %s" %
- (path, err))
- rv = False
- return rv
-
- def _secontext_matches(self, entry):
- """ determine if the SELinux context of the file on disk matches
- the desired context """
- if not has_selinux:
- # no selinux libraries
- return True
-
- path = entry.get("path")
- context = entry.get("secontext")
- if context is None:
- # no context listed
- return True
-
- if context == '__default__':
- if selinux.getfilecon(entry.get('name'))[1] == \
- selinux.matchpathcon(entry.get('name'), 0)[1]:
- return True
- else:
- return False
- elif selinux.getfilecon(entry.get('name'))[1] == context:
- return True
- else:
- return False
-
- def _norm_gid(self, gid):
- """ This takes a group name or gid and returns the
- corresponding gid. """
- try:
- return int(gid)
- except ValueError:
- return int(grp.getgrnam(gid)[2])
-
- def _norm_entry_gid(self, entry):
- try:
- return self._norm_gid(entry.get('group'))
- except (OSError, KeyError):
- err = sys.exc_info()[1]
- self.logger.error('GID normalization failed for %s on %s: %s' %
- (entry.get('group'), entry.get('name'), err))
- return False
-
- def _norm_uid(self, uid):
- """ This takes a username or uid and returns the
- corresponding uid. """
- try:
- return int(uid)
- except ValueError:
- return int(pwd.getpwnam(uid)[2])
-
- def _norm_entry_uid(self, entry):
- try:
- return self._norm_uid(entry.get("owner"))
- except (OSError, KeyError):
- err = sys.exc_info()[1]
- self.logger.error('UID normalization failed for %s on %s: %s' %
- (entry.get('owner'), entry.get('name'), err))
- return False
-
- def _norm_acl_perms(self, perms):
- """ takes a representation of an ACL permset and returns a digit
- representing the permissions entailed by it. representations can
- either be a single octal digit, a string of up to three 'r',
- 'w', 'x', or '-' characters, or a posix1e.Permset object"""
- if hasattr(perms, 'test'):
- # Permset object
- return sum([p for p in acl_map.values()
- if perms.test(p)])
-
- try:
- # single octal digit
- return int(perms)
- except ValueError:
- # couldn't be converted to an int; process as a string
- rv = 0
- for char in perms:
- if char == '-':
- continue
- elif char not in acl_map:
- self.logger.error("Unknown permissions character in ACL: %s"
- % char)
- return 0
- else:
- rv |= acl_map[char]
- return rv
-
- def _acl2string(self, aclkey, perms):
- atype, scope, qualifier = aclkey
- acl_str = []
- if atype == 'default':
- acl_str.append(atype)
- if scope == posix1e.ACL_USER:
- acl_str.append("user")
- elif scope == posix1e.ACL_GROUP:
- acl_str.append("group")
- acl_str.append(qualifier)
- acl_str.append(self._acl_perm2string(perms))
- return ":".join(acl_str)
-
- def _acl_perm2string(self, perm):
- rv = []
- for char in 'rwx':
- if acl_map[char] & perm:
- rv.append(char)
- else:
- rv.append('-')
- return ''.join(rv)
-
- def _is_string(self, strng, encoding):
- """ Returns true if the string contains no ASCII control
- characters and can be decoded from the specified encoding. """
- for char in strng:
- if ord(char) < 9 or ord(char) > 13 and ord(char) < 32:
- return False
- try:
- strng.decode(encoding)
- return True
- except:
- return False
-
- def Verifydevice(self, entry, _):
- """Verify device entry."""
- if entry.get('dev_type') in ['block', 'char']:
- # check if major/minor are properly specified
- if (entry.get('major') == None or
- entry.get('minor') == None):
- self.logger.error('Entry %s not completely specified. '
- 'Try running bcfg2-lint.' %
- (entry.get('name')))
- return False
-
- try:
- ondisk = os.stat(path)
- except OSError:
- entry.set('current_exists', 'false')
- self.logger.debug("%s %s does not exist" %
- (entry.tag, path))
- return False
-
- rv = self._verify_metadata(entry)
-
- # attempt to verify device properties as specified in config
- dev_type = entry.get('dev_type')
- if dev_type in ['block', 'char']:
- major = int(entry.get('major'))
- minor = int(entry.get('minor'))
- if major != os.major(ondisk.st_rdev):
- entry.set('current_mtime', mtime)
- msg = ("Major number for device %s is incorrect. "
- "Current major is %s but should be %s" %
- (path, os.major(ondisk.st_rdev), major))
- self.logger.debug(msg)
- entry.set('qtext', entry.get('qtext') + "\n" + msg)
- rv = False
-
- if minor != os.minor(ondisk.st_rdev):
- entry.set('current_mtime', mtime)
- msg = ("Minor number for device %s is incorrect. "
- "Current minor is %s but should be %s" %
- (path, os.minor(ondisk.st_rdev), minor))
- self.logger.debug(msg)
- entry.set('qtext', entry.get('qtext') + "\n" + msg)
- rv = False
-
- return rv
-
- def Installdevice(self, entry):
- """Install device entries."""
- try:
- # check for existing paths and remove them
- os.lstat(entry.get('name'))
- try:
- os.unlink(entry.get('name'))
- exists = False
- except OSError:
- self.logger.info('Failed to unlink %s' %
- entry.get('name'))
- return False
- except OSError:
- exists = False
-
- if not exists:
- try:
- dev_type = entry.get('dev_type')
- mode = device_map[dev_type] | int(entry.get('mode', '0600'), 8)
- if dev_type in ['block', 'char']:
- # check if major/minor are properly specified
- if (entry.get('major') == None or
- entry.get('minor') == None):
- self.logger.error('Entry %s not completely specified. '
- 'Try running bcfg2-lint.' %
- entry.get('name'))
- return False
- major = int(entry.get('major'))
- minor = int(entry.get('minor'))
- device = os.makedev(major, minor)
- os.mknod(entry.get('name'), mode, device)
- else:
- os.mknod(entry.get('name'), mode)
- return self._set_perms(entry)
- except KeyError:
- self.logger.error('Failed to install %s' % entry.get('name'))
- except OSError:
- self.logger.error('Failed to install %s' % entry.get('name'))
- return False
-
- def Verifydirectory(self, entry, modlist):
- """Verify Path type='directory' entry."""
- pruneTrue = True
- ex_ents = []
- if (entry.get('prune', 'false') == 'true'
- and (entry.tag == 'Path' and entry.get('type') == 'directory')):
- # check for any extra entries when prune='true' attribute is set
- try:
- entries = ['/'.join([entry.get('name'), ent])
- for ent in os.listdir(entry.get('name'))]
- ex_ents = [e for e in entries if e not in modlist]
- if ex_ents:
- pruneTrue = False
- self.logger.info("POSIX: Directory %s contains "
- "extra entries:" % entry.get('name'))
- self.logger.info(ex_ents)
- nqtext = entry.get('qtext', '') + '\n'
- nqtext += "Directory %s contains extra entries: " % \
- entry.get('name')
- nqtext += ":".join(ex_ents)
- entry.set('qtext', nqtext)
- [entry.append(XML.Element('Prune', path=x))
- for x in ex_ents]
- except OSError:
- ex_ents = []
- pruneTrue = True
-
- return pruneTrue and self._verify_metadata(entry)
-
- def Installdirectory(self, entry):
- """Install Path type='directory' entry."""
- self.logger.info("Installing directory %s" % entry.get('name'))
- try:
- fmode = os.lstat(entry.get('name'))
- except OSError:
- # stat failed
- exists = False
-
- if not stat.S_ISDIR(fmode[stat.ST_MODE]):
- self.logger.debug("Found a non-directory entry at %s" %
- entry.get('name'))
- try:
- os.unlink(entry.get('name'))
- exists = False
- except OSError:
- self.logger.info("Failed to unlink %s" % entry.get('name'))
- return False
- else:
- self.logger.debug("Found a pre-existing directory at %s" %
- entry.get('name'))
- exists = True
-
- if not exists:
- parent = "/".join(entry.get('name').split('/')[:-1])
- if parent:
- try:
- os.stat(parent)
- except:
- self.logger.debug('Creating parent path for directory %s' %
- entry.get('name'))
- for idx in range(len(parent.split('/')[:-1])):
- current = '/' + '/'.join(parent.split('/')[1:2+idx])
- try:
- sloc = os.stat(current)
- except OSError:
- try:
- os.mkdir(current)
- continue
- except OSError:
- return False
- if not stat.S_ISDIR(sloc[stat.ST_MODE]):
- try:
- os.unlink(current)
- os.mkdir(current)
- except OSError:
- return False
-
- try:
- os.mkdir(entry.get('name'))
- except OSError:
- self.logger.error('Failed to create directory %s' %
- entry.get('name'))
- return False
- if entry.get('prune', 'false') == 'true' and entry.get("qtext"):
- for pent in entry.findall('Prune'):
- pname = pent.get('path')
- ulfailed = False
- if os.path.isdir(pname):
- self.logger.info("Not removing extra directory %s, "
- "please check and remove manually" % pname)
- continue
- try:
- self.logger.debug("Unlinking file %s" % pname)
- os.unlink(pname)
- except OSError:
- self.logger.error("Failed to unlink path %s" % pname)
- ulfailed = True
- if ulfailed:
- return False
- return self.Installpermissions(entry)
-
- def Verifyfile(self, entry, _):
- """Verify Path type='file' entry."""
- # permissions check + content check
- permissionStatus = self._verify_metadata(entry)
- tbin = False
- if entry.text == None and entry.get('empty', 'false') == 'false':
- self.logger.error("Cannot verify incomplete Path type='%s' %s" %
- (entry.get('type'), entry.get('name')))
- return False
- if entry.get('encoding', 'ascii') == 'base64':
- tempdata = binascii.a2b_base64(entry.text)
- tbin = True
- elif entry.get('empty', 'false') == 'true':
- tempdata = ''
- else:
- tempdata = entry.text
- if type(tempdata) == unicode:
- try:
- tempdata = tempdata.encode(self.setup['encoding'])
- except UnicodeEncodeError:
- e = sys.exc_info()[1]
- self.logger.error("Error encoding file %s:\n %s" %
- (entry.get('name'), e))
-
- different = False
- content = None
- if not os.path.exists(entry.get("name")):
- # first, see if the target file exists at all; if not,
- # they're clearly different
- different = True
- content = ""
- else:
- # next, see if the size of the target file is different
- # from the size of the desired content
- try:
- estat = os.stat(entry.get('name'))
- except OSError:
- err = sys.exc_info()[1]
- self.logger.error("Failed to stat %s: %s" %
- (err.filename, err))
- return False
- if len(tempdata) != estat[stat.ST_SIZE]:
- different = True
- else:
- # finally, read in the target file and compare them
- # directly. comparison could be done with a checksum,
- # which might be faster for big binary files, but
- # slower for everything else
- try:
- content = open(entry.get('name')).read()
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Failed to read %s: %s" %
- (err.filename, err))
- return False
- different = content != tempdata
-
- if different:
- if self.setup['interactive']:
- prompt = [entry.get('qtext', '')]
- if not tbin and content is None:
- # it's possible that we figured out the files are
- # different without reading in the local file. if
- # the supplied version of the file is not binary,
- # we now have to read in the local file to figure
- # out if _it_ is binary, and either include that
- # fact or the diff in our prompts for -I
- try:
- content = open(entry.get('name')).read()
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Failed to read %s: %s" %
- (err.filename, err))
- return False
- if tbin or not self._is_string(content, self.setup['encoding']):
- # don't compute diffs if the file is binary
- prompt.append('Binary file, no printable diff')
- else:
- diff = self._diff(content, tempdata,
- difflib.unified_diff,
- filename=entry.get("name"))
- if diff:
- udiff = '\n'.join(diff)
- try:
- prompt.append(udiff.decode(self.setup['encoding']))
- except UnicodeDecodeError:
- prompt.append("Binary file, no printable diff")
- else:
- prompt.append("Diff took too long to compute, no "
- "printable diff")
- entry.set("qtext", "\n".join(prompt))
-
- if entry.get('sensitive', 'false').lower() != 'true':
- if content is None:
- # it's possible that we figured out the files are
- # different without reading in the local file. we
- # now have to read in the local file to figure out
- # if _it_ is binary, and either include the whole
- # file or the diff for reports
- try:
- content = open(entry.get('name')).read()
- except IOError:
- err = sys.exc_info()[1]
- self.logger.error("Failed to read %s: %s" %
- (err.filename, err))
- return False
-
- if tbin or not self._is_string(content, self.setup['encoding']):
- # don't compute diffs if the file is binary
- entry.set('current_bfile', binascii.b2a_base64(content))
- else:
- diff = self._diff(content, tempdata, difflib.ndiff,
- filename=entry.get("name"))
- if diff:
- entry.set("current_bdiff",
- binascii.b2a_base64("\n".join(diff)))
- elif not tbin and self._is_string(content,
- self.setup['encoding']):
- entry.set('current_bfile', binascii.b2a_base64(content))
-
- return permissionStatus and not different
-
- def Installfile(self, entry):
- """Install Path type='file' entry."""
- self.logger.info("Installing file %s" % (entry.get('name')))
-
- parent = "/".join(entry.get('name').split('/')[:-1])
- if parent:
- try:
- os.stat(parent)
- except:
- self.logger.debug('Creating parent path for config file %s' %
- entry.get('name'))
- current = '/'
- for next in parent.split('/')[1:]:
- current += next + '/'
- try:
- sloc = os.stat(current)
- try:
- if not stat.S_ISDIR(sloc[stat.ST_MODE]):
- self.logger.debug('%s is not a directory; recreating'
- % current)
- os.unlink(current)
- os.mkdir(current)
- except OSError:
- return False
- except OSError:
- try:
- self.logger.debug("Creating non-existent path %s" %
- current)
- os.mkdir(current)
- except OSError:
- return False
-
- # If we get here, then the parent directory should exist
- if (entry.get("paranoid", 'false').lower() == 'true' and
- self.setup.get("paranoid", False) and
- entry.get('current_exists', 'true') != 'false'):
- bkupnam = entry.get('name').replace('/', '_')
- # current list of backups for this file
- try:
- bkuplist = [f for f in os.listdir(self.ppath) if
- f.startswith(bkupnam)]
- except OSError:
- e = sys.exc_info()[1]
- self.logger.error("Failed to create backup list in %s: %s" %
- (self.ppath, e.strerror))
- return False
- bkuplist.sort()
- while len(bkuplist) >= int(self.max_copies):
- # remove the oldest backup available
- oldest = bkuplist.pop(0)
- self.logger.info("Removing %s" % oldest)
- try:
- os.remove("%s/%s" % (self.ppath, oldest))
- except:
- self.logger.error("Failed to remove %s/%s" %
- (self.ppath, oldest))
- return False
- try:
- # backup existing file
- shutil.copy(entry.get('name'),
- "%s/%s_%s" % (self.ppath, bkupnam,
- datetime.isoformat(datetime.now())))
- self.logger.info("Backup of %s saved to %s" %
- (entry.get('name'), self.ppath))
- except IOError:
- e = sys.exc_info()[1]
- self.logger.error("Failed to create backup file for %s" %
- entry.get('name'))
- self.logger.error(e)
- return False
- try:
- newfile = open("%s.new"%(entry.get('name')), 'w')
- if entry.get('encoding', 'ascii') == 'base64':
- filedata = binascii.a2b_base64(entry.text)
- elif entry.get('empty', 'false') == 'true':
- filedata = ''
- else:
- if type(entry.text) == unicode:
- filedata = entry.text.encode(self.setup['encoding'])
- else:
- filedata = entry.text
- newfile.write(filedata)
- newfile.close()
-
- rv = self._set_perms(entry, newfile.name)
- os.rename(newfile.name, entry.get('name'))
- if entry.get('mtime'):
- try:
- os.utime(entry.get('name'), (int(entry.get('mtime')),
- int(entry.get('mtime'))))
- except:
- self.logger.error("Failed to set mtime of %s" % path)
- rv = False
- return rv
- except (OSError, IOError):
- err = sys.exc_info()[1]
- self.logger.error("Failed to open %s for writing: %s" %
- (entry.get('name'), err))
- return False
-
- def Verifyhardlink(self, entry, _):
- """Verify HardLink entry."""
- rv = True
-
- try:
- if not os.path.samefile(entry.get('name'), entry.get('to')):
- msg = "Hardlink %s is incorrect." % entry.get('name')
- self.logger.debug(msg)
- entry.set('qtext', "\n".join([entry.get('qtext', ''), msg]))
- rv = False
- except OSError:
- entry.set('current_exists', 'false')
- return False
-
- rv &= self._verify_secontext(entry)
- return rv
-
- def Installhardlink(self, entry):
- """Install HardLink entry."""
- self.logger.info("Installing Hardlink %s" % entry.get('name'))
- if os.path.lexists(entry.get('name')):
- try:
- fmode = os.lstat(entry.get('name'))[stat.ST_MODE]
- if stat.S_ISREG(fmode) or stat.S_ISLNK(fmode):
- self.logger.debug("Non-directory entry already exists at "
- "%s. Unlinking entry." %
- entry.get('name'))
- os.unlink(entry.get('name'))
- elif stat.S_ISDIR(fmode):
- self.logger.debug("Directory already exists at %s" %
- entry.get('name'))
- self.cmd.run("mv %s/ %s.bak" % (entry.get('name'),
- entry.get('name')))
- else:
- os.unlink(entry.get('name'))
- except OSError:
- self.logger.info("Hardlink %s cleanup failed" % \
- (entry.get('name')))
- try:
- os.link(entry.get('to'), entry.get('name'))
- return self._set_perms(entry)
- except OSError:
- return False
-
- def Verifynonexistent(self, entry, _):
- """Verify nonexistent entry."""
- # return true if path does _not_ exist
- return not os.path.lexists(entry.get('name'))
-
- def Installnonexistent(self, entry):
- '''Remove nonexistent entries'''
- ename = entry.get('name')
- if entry.get('recursive').lower() == 'true':
- # ensure that configuration spec is consistent first
- if [e for e in self.buildModlist() \
- if e.startswith(ename) and e != ename]:
- self.logger.error('Not installing %s. One or more files '
- 'in this directory are specified in '
- 'your configuration.' % ename)
- return False
- try:
- shutil.rmtree(ename)
- except OSError:
- e = sys.exc_info()[1]
- self.logger.error('Failed to remove %s: %s' % (ename,
- e.strerror))
- else:
- if os.path.islink(ename):
- os.remove(ename)
- return True
- elif os.path.isdir(ename):
- try:
- os.rmdir(ename)
- return True
- except OSError:
- e = sys.exc_info()[1]
- self.logger.error('Failed to remove %s: %s' % (ename,
- e.strerror))
- return False
- try:
- os.remove(ename)
- return True
- except OSError:
- e = sys.exc_info()[1]
- self.logger.error('Failed to remove %s: %s' % (ename,
- e.strerror))
- return False
-
- def Verifypermissions(self, entry, _):
- """Verify Path type='permissions' entry"""
- rv = self._verify_metadata(entry)
-
- if entry.get('recursive', 'false').lower() == 'true':
- # verify ownership information recursively
- for root, dirs, files in os.walk(entry.get('name')):
- for p in dirs + files:
- rv &= self._verify_metadata(entry,
- path=os.path.join(root, p))
- return rv
-
- def Installpermissions(self, entry):
- """Install POSIX permissions"""
- plist = [entry.get('name')]
- if entry.get('recursive', 'false').lower() == 'true':
- # verify ownership information recursively
- for root, dirs, files in os.walk(entry.get('name')):
- for p in dirs + files:
- if not self._verify_metadata(entry,
- path=os.path.join(root, p),
- checkonly=True):
- plist.append(path)
- rv = True
- for path in plist:
- rv &= self._set_perms(entry, path)
- return rv
-
- def Verifysymlink(self, entry, _):
- """Verify Path type='symlink' entry."""
- if entry.get('to') == None:
- self.logger.error('Entry %s not completely specified. '
- 'Try running bcfg2-lint.' %
- (entry.get('name')))
- return False
-
- rv = True
-
- try:
- sloc = os.readlink(entry.get('name'))
- if sloc != entry.get('to'):
- entry.set('current_to', sloc)
- msg = ("Symlink %s points to %s, should be %s" %
- (entry.get('name'), sloc, entry.get('to')))
- self.logger.debug(msg)
- entry.set('qtext', "\n".join([entry.get('qtext', ''), msg]))
- rv = False
- except OSError:
- entry.set('current_exists', 'false')
- return False
-
- rv &= self._verify_secontext(entry)
- return rv
-
- def Installsymlink(self, entry):
- """Install Path type='symlink' entry."""
- if entry.get('to') == None:
- self.logger.error('Entry %s not completely specified. '
- 'Try running bcfg2-lint.' % entry.get('name'))
- return False
- self.logger.info("Installing symlink %s" % (entry.get('name')))
- if os.path.lexists(entry.get('name')):
- try:
- fmode = os.lstat(entry.get('name'))[stat.ST_MODE]
- if stat.S_ISREG(fmode) or stat.S_ISLNK(fmode):
- self.logger.debug("Non-directory entry already exists at "
- "%s. Unlinking entry." %
- entry.get('name'))
- os.unlink(entry.get('name'))
- elif stat.S_ISDIR(fmode):
- self.logger.debug("Directory already exists at %s" %
- entry.get('name'))
- self.cmd.run("mv %s/ %s.bak" % (entry.get('name'),
- entry.get('name')))
- else:
- os.unlink(entry.get('name'))
- except OSError:
- self.logger.info("Symlink %s cleanup failed" %
- (entry.get('name')))
- try:
- os.symlink(entry.get('to'), entry.get('name'))
- return self._set_setcontext(entry)
- except OSError:
- return False
-
- def InstallPath(self, entry):
- """Dispatch install to the proper method according to type"""
- ret = getattr(self, 'Install%s' % entry.get('type'))
- return ret(entry)
-
- def VerifyPath(self, entry, _):
- """Dispatch verify to the proper method according to type"""
- ret = getattr(self, 'Verify%s' % entry.get('type'))(entry, _)
- if entry.get('qtext') and self.setup['interactive']:
- entry.set('qtext',
- '%s\nInstall %s %s: (y/N) ' %
- (entry.get('qtext'),
- entry.get('type'), entry.get('name')))
- return ret
-
- def _verify_metadata(self, entry, path=None, checkonly=False):
- """ generic method to verify perms, owner, group, secontext,
- and mtime """
-
- # allow setting an alternate path for recursive permissions checking
- if path is None:
- path = entry.get('name')
-
- while len(entry.get('perms', '')) < 4:
- entry.set('perms', '0' + entry.get('perms', ''))
-
- try:
- ondisk = os.stat(path)
- except OSError:
- entry.set('current_exists', 'false')
- self.logger.debug("POSIX: %s %s does not exist" %
- (entry.tag, path))
- return False
-
- try:
- owner = str(ondisk[stat.ST_UID])
- group = str(ondisk[stat.ST_GID])
- except (OSError, KeyError):
- self.logger.error('POSIX: User/Group resolution failed for path %s'
- % path)
- owner = 'root'
- group = '0'
-
- perms = oct(ondisk[stat.ST_MODE])[-4:]
- if entry.get('mtime', '-1') != '-1':
- mtime = str(ondisk[stat.ST_MTIME])
- else:
- mtime = '-1'
-
- configOwner = str(self._norm_entry_uid(entry))
- configGroup = str(self._norm_entry_gid(entry))
- configPerms = int(entry.get('perms'), 8)
- if entry.get('dev_type'):
- configPerms |= device_map[entry.get('dev_type')]
- if has_selinux:
- if entry.get("secontext") == "__default__":
- try:
- configContext = selinux.matchpathcon(path, 0)[1]
- except OSError:
- self.logger.warning("Failed to get default SELinux context "
- "for %s; missing fcontext rule?" %
- path)
- return False
- else:
- configContext = entry.get("secontext")
-
- errors = []
- if owner != configOwner:
- if checkonly:
- return False
- entry.set('current_owner', owner)
- errors.append("POSIX: Owner for path %s is incorrect. "
- "Current owner is %s but should be %s" %
- (path, ondisk.st_uid, entry.get('owner')))
-
- if group != configGroup:
- if checkonly:
- return False
- entry.set('current_group', group)
- errors.append("POSIX: Group for path %s is incorrect. "
- "Current group is %s but should be %s" %
- (path, ondisk.st_gid, entry.get('group')))
-
- if oct(int(perms, 8)) != oct(configPerms):
- if checkonly:
- return False
- entry.set('current_perms', perms)
- errors.append("POSIX: Permissions for path %s are incorrect. "
- "Current permissions are %s but should be %s" %
- (path, perms, entry.get('perms')))
-
- if entry.get('mtime') and mtime != entry.get('mtime', '-1'):
- if checkonly:
- return False
- entry.set('current_mtime', mtime)
- errors.append("POSIX: mtime for path %s is incorrect. "
- "Current mtime is %s but should be %s" %
- (path, mtime, entry.get('mtime')))
-
- seVerifies = self._verify_secontext(entry)
- aclVerifies = self._verify_acls(entry)
-
- if errors:
- for error in errors:
- self.logger.debug(error)
- entry.set('qtext', "\n".join([entry.get('qtext', '')] + errors))
- return False
- else:
- return seVerifies and aclVerifies
-
- def _list_entry_acls(self, entry):
- wanted = dict()
- for acl in entry.findall("ACL"):
- if acl.get("scope") == "user":
- scope = posix1e.ACL_USER
- elif acl.get("scope") == "group":
- scope = posix1e.ACL_GROUP
- else:
- self.logger.error("Unknown ACL scope %s" % acl.get("scope"))
- continue
- wanted[(acl.get("type"), scope, acl.get(acl.get("scope")))] = \
- self._norm_acl_perms(acl.get('perms'))
- return wanted
-
- def _list_file_acls(self, entry):
- def _process_acl(acl, atype):
- try:
- if acl.tag_type == posix1e.ACL_USER:
- qual = pwd.getpwuid(acl.qualifier)[0]
- elif acl.tag_type == posix1e.ACL_GROUP:
- qual = grp.getgrgid(acl.qualifier)[0]
- else:
- return
- except (OSError, KeyError):
- err = sys.exc_info()[1]
- self.logger.error("Lookup of %s %s failed: %s" %
- (scope, acl.qualifier, err))
- qual = acl.qualifier
- existing[(atype, acl.tag_type, qual)] = \
- self._norm_acl_perms(acl.permset)
-
- existing = dict()
- for acl in posix1e.ACL(file=entry.get("name")):
- _process_acl(acl, "access")
- if os.path.isdir(entry.get("name")):
- for acl in posix1e.ACL(filedef=entry.get("name")):
- _process_acl(acl, "default")
- return existing
-
- def _verify_acls(self, entry):
- if not has_acls:
- if entry.findall("ACL"):
- self.logger.debug("ACLs listed for %s but no pylibacl library "
- "installed" % entry.get('name'))
- return True
-
- # create lists of normalized representations of the ACLs we want
- # and the ACLs we have. this will make them easier to compare
- # than trying to mine that data out of the ACL objects and XML
- # objects and compare it at the same time.
- wanted = self._list_entry_acls(entry)
- existing = self._list_file_acls(entry)
-
- missing = []
- extra = []
- wrong = []
- for aclkey, perms in wanted.items():
- acl_str = self._acl2string(aclkey, perms)
- if aclkey not in existing:
- missing.append(acl_str)
- elif existing[aclkey] != perms:
- wrong.append((acl_str,
- self._acl2string(aclkey, existing[aclkey])))
-
- for aclkey, perms in existing.items():
- if aclkey not in wanted:
- extra.append(self._acl2string(aclkey, perms))
-
- msg = []
- if missing:
- msg.append("%s ACLs are missing: %s" % (len(missing),
- ", ".join(missing)))
- if wrong:
- msg.append("%s ACLs are wrong: %s" %
- (len(wrong),
- "; ".join(["%s should be %s" % (e, w)
- for w, e in wrong])))
- if extra:
- msg.append("%s extra ACLs: %s" % (len(extra), ", ".join(extra)))
-
- if msg:
- msg.insert(0,
- "POSIX ACLs for path %s are incorrect." %
- entry.get("name"))
- self.logger.debug(msg[0])
- for line in msg[1:]:
- self.logger.debug(" " + line)
- entry.set('qtext', "\n".join([entry.get("qtext", '')] + msg))
- return False
- return True
-
- def _verify_secontext(self, entry):
- if not self._secontext_matches(entry):
- path = entry.get("name")
- if entry.get("secontext") == "__default__":
- configContext = selinux.matchpathcon(path, 0)[1]
- else:
- configContext = entry.get("secontext")
- pcontext = selinux.getfilecon(path)[1]
- entry.set('current_secontext', pcontext)
- msg = ("SELinux context for path %s is incorrect. "
- "Current context is %s but should be %s" %
- (path, pcontext, configContext))
- self.logger.debug("POSIX: " + msg)
- entry.set('qtext', "\n".join([entry.get("qtext", ''), msg]))
- return False
- return True
-
- def _diff(self, content1, content2, difffunc, filename=None):
- rv = []
- start = time.time()
- longtime = False
- for diffline in difffunc(content1.split('\n'),
- content2.split('\n')):
- now = time.time()
- rv.append(diffline)
- if now - start > 5 and not longtime:
- if filename:
- self.logger.info("Diff of %s taking a long time" %
- filename)
- else:
- self.logger.info("Diff taking a long time")
- longtime = True
- elif now - start > 30:
- if filename:
- self.logger.error("Diff of %s took too long; giving up" %
- filename)
- else:
- self.logger.error("Diff took too long; giving up")
- return False
- return rv
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/Device.py b/src/lib/Bcfg2/Client/Tools/POSIX/Device.py
new file mode 100644
index 000000000..b8fb0f4d0
--- /dev/null
+++ b/src/lib/Bcfg2/Client/Tools/POSIX/Device.py
@@ -0,0 +1,62 @@
+import os
+import sys
+from base import POSIXTool, device_map
+
+class POSIXDevice(POSIXTool):
+ __req__ = ['name', 'dev_type', 'perms', 'owner', 'group']
+
+ def fully_specified(self, entry):
+ if entry.get('dev_type') in ['block', 'char']:
+ # check if major/minor are properly specified
+ if (entry.get('major') == None or
+ entry.get('minor') == None):
+ return False
+ return True
+
+ def verify(self, entry, modlist):
+ """Verify device entry."""
+ ondisk = self._exists(entry)
+ if not ondisk:
+ return False
+
+ # attempt to verify device properties as specified in config
+ rv = True
+ dev_type = entry.get('dev_type')
+ if dev_type in ['block', 'char']:
+ major = int(entry.get('major'))
+ minor = int(entry.get('minor'))
+ if major != os.major(ondisk.st_rdev):
+ msg = ("Major number for device %s is incorrect. "
+ "Current major is %s but should be %s" %
+ (entry.get("name"), os.major(ondisk.st_rdev), major))
+ self.logger.debug('POSIX: ' + msg)
+ entry.set('qtext', entry.get('qtext', '') + "\n" + msg)
+ rv = False
+
+ if minor != os.minor(ondisk.st_rdev):
+ msg = ("Minor number for device %s is incorrect. "
+ "Current minor is %s but should be %s" %
+ (entry.get("name"), os.minor(ondisk.st_rdev), minor))
+ self.logger.debug('POSIX: ' + msg)
+ entry.set('qtext', entry.get('qtext', '') + "\n" + msg)
+ rv = False
+ return POSIXTool.verify(self, entry, modlist) and rv
+
+ def install(self, entry):
+ if not self._exists(entry, remove=True):
+ try:
+ dev_type = entry.get('dev_type')
+ mode = device_map[dev_type] | int(entry.get('perms'), 8)
+ if dev_type in ['block', 'char']:
+ major = int(entry.get('major'))
+ minor = int(entry.get('minor'))
+ device = os.makedev(major, minor)
+ os.mknod(entry.get('name'), mode, device)
+ else:
+ os.mknod(entry.get('name'), mode)
+ except (KeyError, OSError, ValueError):
+ err = sys.exc_info()[1]
+ self.logger.error('POSIX: Failed to install %s: %s' %
+ (entry.get('name'), err))
+ return False
+ return POSIXTool.install(self, entry)
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/Directory.py b/src/lib/Bcfg2/Client/Tools/POSIX/Directory.py
new file mode 100644
index 000000000..4b0ad93ef
--- /dev/null
+++ b/src/lib/Bcfg2/Client/Tools/POSIX/Directory.py
@@ -0,0 +1,86 @@
+import os
+import sys
+import stat
+import shutil
+import Bcfg2.Client.XML
+from base import POSIXTool
+
+class POSIXDirectory(POSIXTool):
+ __req__ = ['name', 'perms', 'owner', 'group']
+
+ def verify(self, entry, modlist):
+ ondisk = self._exists(entry)
+ if not ondisk:
+ return False
+
+ if not stat.S_ISDIR(ondisk[stat.ST_MODE]):
+ self.logger.info("POSIX: %s is not a directory" % entry.get('name'))
+ return False
+
+ pruneTrue = True
+ if entry.get('prune', 'false').lower() == 'true':
+ # check for any extra entries when prune='true' attribute is set
+ try:
+ extras = [os.path.join(entry.get('name'), ent)
+ for ent in os.listdir(entry.get('name'))
+ if os.path.join(entry.get('name'),
+ ent) not in modlist]
+ if extras:
+ pruneTrue = False
+ msg = "Directory %s contains extra entries: %s" % \
+ (entry.get('name'), "; ".join(extras))
+ self.logger.info("POSIX: " + msg)
+ entry.set('qtext', entry.get('qtext', '') + '\n' + msg)
+ for extra in extras:
+ Bcfg2.Client.XML.SubElement(entry, 'Prune', path=extra)
+ except OSError:
+ pruneTrue = True
+
+ return POSIXTool.verify(self, entry, modlist) and pruneTrue
+
+ def install(self, entry):
+ """Install device entries."""
+ fmode = self._exists(entry)
+
+ if fmode and not stat.S_ISDIR(fmode[stat.ST_MODE]):
+ self.logger.info("POSIX: Found a non-directory entry at %s, "
+ "removing" % entry.get('name'))
+ try:
+ os.unlink(entry.get('name'))
+ fmode = False
+ except OSError:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Failed to unlink %s: %s" %
+ (entry.get('name'), err))
+ return False
+ elif fmode:
+ self.logger.debug("POSIX: Found a pre-existing directory at %s" %
+ entry.get('name'))
+
+ rv = True
+ if not fmode:
+ rv &= self._makedirs(entry)
+
+ if entry.get('prune', 'false') == 'true':
+ ulfailed = False
+ for pent in entry.findall('Prune'):
+ pname = pent.get('path')
+ ulfailed = False
+ if os.path.isdir(pname):
+ rm = shutil.rmtree
+ else:
+ rm = os.unlink
+ try:
+ self.logger.debug("POSIX: Removing %s" % pname)
+ rm(pname)
+ except OSError:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Failed to unlink %s: %s" %
+ (pname, err))
+ ulfailed = True
+ if ulfailed:
+ # even if prune failed, we still want to install the
+ # entry to make sure that we get permissions and
+ # whatnot set
+ rv = False
+ return POSIXTool.install(self, entry) and rv
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/File.py b/src/lib/Bcfg2/Client/Tools/POSIX/File.py
new file mode 100644
index 000000000..73ed2d8bf
--- /dev/null
+++ b/src/lib/Bcfg2/Client/Tools/POSIX/File.py
@@ -0,0 +1,219 @@
+import os
+import sys
+import stat
+import time
+import difflib
+import binascii
+import tempfile
+from base import POSIXTool
+
+# py3k compatibility
+if sys.hexversion >= 0x03000000:
+ unicode = str
+
+class POSIXFile(POSIXTool):
+ __req__ = ['name', 'perms', 'owner', 'group']
+
+ def fully_specified(self, entry):
+ return entry.text is not None or entry.get('empty', 'false') == 'true'
+
+ def _is_string(self, strng, encoding):
+ """ Returns true if the string contains no ASCII control
+ characters and can be decoded from the specified encoding. """
+ for char in strng:
+ if ord(char) < 9 or ord(char) > 13 and ord(char) < 32:
+ return False
+ try:
+ strng.decode(encoding)
+ return True
+ except:
+ return False
+
+ def _get_data(self, entry):
+ is_binary = False
+ if entry.get('encoding', 'ascii') == 'base64':
+ tempdata = binascii.a2b_base64(entry.text)
+ is_binary = True
+ elif entry.get('empty', 'false') == 'true':
+ tempdata = ''
+ else:
+ tempdata = entry.text
+ if isinstance(tempdata, unicode):
+ try:
+ tempdata = tempdata.encode(self.setup['encoding'])
+ except UnicodeEncodeError:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Error encoding file %s: %s" %
+ (entry.get('name'), err))
+ return (tempdata, is_binary)
+
+ def verify(self, entry, modlist):
+ ondisk = self._exists(entry)
+ tempdata, is_binary = self._get_data(entry)
+
+ different = False
+ content = None
+ if not ondisk:
+ # first, see if the target file exists at all; if not,
+ # they're clearly different
+ different = True
+ content = ""
+ elif len(tempdata) != ondisk[stat.ST_SIZE]:
+ # next, see if the size of the target file is different
+ # from the size of the desired content
+ different = True
+ else:
+ # finally, read in the target file and compare them
+ # directly. comparison could be done with a checksum,
+ # which might be faster for big binary files, but slower
+ # for everything else
+ try:
+ content = open(entry.get('name')).read()
+ except IOError:
+ self.logger.error("POSIX: Failed to read %s: %s" %
+ (entry.get("name"), sys.exc_info()[1]))
+ return False
+ different = content != tempdata
+
+ if different:
+ self.logger.debug("POSIX: %s has incorrect contents" %
+ entry.get("name"))
+ self._get_diffs(
+ entry, interactive=self.setup['interactive'],
+ sensitive=entry.get('sensitive', 'false').lower() == 'true',
+ is_binary=is_binary, content=content)
+ return POSIXTool.verify(self, entry, modlist) and not different
+
+ def _write_tmpfile(self, entry):
+ filedata, _ = self._get_data(entry)
+ # get a temp file to write to that is in the same directory as
+ # the existing file in order to preserve any permissions
+ # protections on that directory, and also to avoid issues with
+ # /tmp set nosetuid while creating files that are supposed to
+ # be setuid
+ try:
+ (newfd, newfile) = \
+ tempfile.mkstemp(prefix=os.path.basename(entry.get("name")),
+ dir=os.path.dirname(entry.get("name")))
+ except OSError:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Failed to create temp file in %s: %s" %
+ (os.path.dirname(entry.get('name')), err))
+ return False
+ try:
+ os.fdopen(newfd, 'w').write(filedata)
+ except (OSError, IOError):
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Failed to open temp file %s for writing "
+ "%s: %s" %
+ (newfile, entry.get("name"), err))
+ return False
+ return newfile
+
+ def _rename_tmpfile(self, newfile, entry):
+ try:
+ os.rename(newfile, entry.get('name'))
+ return True
+ except OSError:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Failed to rename temp file %s to %s: %s" %
+ (newfile, entry.get('name'), err))
+ try:
+ os.unlink(newfile)
+ except:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Could not remove temp file %s: %s" %
+ (newfile, err))
+ return False
+
+ def install(self, entry):
+ """Install device entries."""
+ if not os.path.exists(os.path.dirname(entry.get('name'))):
+ if not self._makedirs(entry,
+ path=os.path.dirname(entry.get('name'))):
+ return False
+ newfile = self._write_tmpfile(entry)
+ if not newfile:
+ return False
+ rv = self._set_perms(entry, path=newfile)
+ if not self._rename_tmpfile(newfile, entry):
+ return False
+
+ return POSIXTool.install(self, entry) and rv
+
+ def _get_diffs(entry, interactive=False, sensitive=False, is_binary=False,
+ content=None):
+ if not interactive and sensitive:
+ return
+
+ prompt = [entry.get('qtext', '')]
+ attrs = dict()
+ if not is_binary and content is None:
+ # it's possible that we figured out the files are
+ # different without reading in the local file. if the
+ # supplied version of the file is not binary, we now have
+ # to read in the local file to figure out if _it_ is
+ # binary, and either include that fact or the diff in our
+ # prompts for -I and the reports
+ try:
+ content = open(entry.get('name')).read()
+ except IOError:
+ self.logger.error("POSIX: Failed to read %s: %s" %
+ (entry.get("name"), sys.exc_info()[1]))
+ return False
+ is_binary &= self._is_string(content, self.setup['encoding'])
+ if is_binary:
+ # don't compute diffs if the file is binary
+ prompt.append('Binary file, no printable diff')
+ attrs['current_bfile'] = binascii.b2a_base64(content)
+ else:
+ if interactive:
+ diff = self._diff(content, tempdata,
+ difflib.unified_diff,
+ filename=entry.get("name"))
+ if diff:
+ udiff = '\n'.join(diff)
+ try:
+ prompt.append(udiff.decode(self.setup['encoding']))
+ except UnicodeEncodeError:
+ prompt.append("Could not encode diff")
+ else:
+ prompt.append("Diff took too long to compute, no "
+ "printable diff")
+ if not sensitive:
+ diff = self._diff(content, tempdata, difflib.ndiff,
+ filename=entry.get("name"))
+ if diff:
+ ndiff = binascii.b2a_base64("\n".join(diff))
+ attrs["current_bdiff"] = ndiff
+ else:
+ attrs['current_bfile'] = binascii.b2a_base64(content)
+ if interactive:
+ entry.set("qtext", "\n".join(prompt))
+ if not sensitive:
+ for attr, val in attrs:
+ entry.set(attr, val)
+
+ def _diff(self, content1, content2, difffunc, filename=None):
+ rv = []
+ start = time.time()
+ longtime = False
+ for diffline in difffunc(content1.split('\n'),
+ content2.split('\n')):
+ now = time.time()
+ rv.append(diffline)
+ if now - start > 5 and not longtime:
+ if filename:
+ self.logger.info("POSIX: Diff of %s taking a long time" %
+ filename)
+ else:
+ self.logger.info("POSIX: Diff taking a long time")
+ longtime = True
+ elif now - start > 30:
+ if filename:
+ self.logger.error("POSIX: Diff of %s took too long; giving "
+ "up" % filename)
+ else:
+ self.logger.error("POSIX: Diff took too long; giving up")
+ return False
+ return rv
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/Hardlink.py b/src/lib/Bcfg2/Client/Tools/POSIX/Hardlink.py
new file mode 100644
index 000000000..569ca3445
--- /dev/null
+++ b/src/lib/Bcfg2/Client/Tools/POSIX/Hardlink.py
@@ -0,0 +1,39 @@
+import os
+import sys
+from base import POSIXTool
+
+class POSIXHardlink(POSIXTool):
+ __req__ = ['name', 'to']
+
+ def verify(self, entry, modlist):
+ rv = True
+
+ try:
+ if not os.path.samefile(entry.get('name'), entry.get('to')):
+ msg = "Hardlink %s is incorrect" % entry.get('name')
+ self.logger.debug("POSIX: " + msg)
+ entry.set('qtext', "\n".join([entry.get('qtext', ''), msg]))
+ rv = False
+ except OSError:
+ self.logger.debug("POSIX: %s %s does not exist" %
+ (entry.tag, entry.get("name")))
+ entry.set('current_exists', 'false')
+ return False
+
+ return POSIXTool.verify(self, entry, modlist) and rv
+
+ def install(self, entry):
+ ondisk = self._exists(entry, remove=True)
+ if ondisk:
+ self.logger.info("POSIX: Hardlink %s cleanup failed" %
+ entry.get('name'))
+ try:
+ os.link(entry.get('to'), entry.get('name'))
+ rv = True
+ except OSError:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Failed to create hardlink %s to %s: %s" %
+ (entry.get('name'), entry.get('to'), err))
+ rv = False
+ return POSIXTool.install(self, entry) and rv
+
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/Nonexistent.py b/src/lib/Bcfg2/Client/Tools/POSIX/Nonexistent.py
new file mode 100644
index 000000000..64a36cce4
--- /dev/null
+++ b/src/lib/Bcfg2/Client/Tools/POSIX/Nonexistent.py
@@ -0,0 +1,41 @@
+import os
+import sys
+import shutil
+from base import POSIXTool
+
+class POSIXNonexistent(POSIXTool):
+ __req__ = ['name']
+
+ def verify(self, entry, _):
+ if os.path.lexists(entry.get('name')):
+ self.logger.debug("POSIX: %s exists but should not" %
+ entry.get("name"))
+ return False
+ return True
+
+ def install(self, entry):
+ ename = entry.get('name')
+ if entry.get('recursive', '').lower() == 'true':
+ # ensure that configuration spec is consistent first
+ for struct in self.config.getchildren():
+ for entry in struct.getchildren():
+ if (entry.tag == 'Path' and
+ entry.get('type') != 'nonexistent' and
+ entry.get('name').startswith(ename)):
+ self.logger.error('POSIX: Not removing %s. One or '
+ 'more files in this directory are '
+ 'specified in your configuration.' %
+ ename)
+ return False
+ rm = shutil.rmtree
+ elif os.path.isdir(ename):
+ rm = os.rmdir
+ else:
+ rm = os.remove
+ try:
+ rm(ename)
+ return True
+ except OSError:
+ err = sys.exc_info()[1]
+ self.logger.error('POSIX: Failed to remove %s: %s' % (ename, err))
+ return False
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/Permissions.py b/src/lib/Bcfg2/Client/Tools/POSIX/Permissions.py
new file mode 100644
index 000000000..c041b9ade
--- /dev/null
+++ b/src/lib/Bcfg2/Client/Tools/POSIX/Permissions.py
@@ -0,0 +1,7 @@
+import os
+import sys
+from base import POSIXTool
+
+class POSIXPermissions(POSIXTool):
+ __req__ = ['name', 'perms', 'owner', 'group']
+
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/Symlink.py b/src/lib/Bcfg2/Client/Tools/POSIX/Symlink.py
new file mode 100644
index 000000000..d5222513e
--- /dev/null
+++ b/src/lib/Bcfg2/Client/Tools/POSIX/Symlink.py
@@ -0,0 +1,42 @@
+import os
+import sys
+from base import POSIXTool
+
+class POSIXSymlink(POSIXTool):
+ __req__ = ['name', 'to']
+
+ def verify(self, entry, modlist):
+ rv = True
+
+ try:
+ sloc = os.readlink(entry.get('name'))
+ if sloc != entry.get('to'):
+ entry.set('current_to', sloc)
+ msg = ("Symlink %s points to %s, should be %s" %
+ (entry.get('name'), sloc, entry.get('to')))
+ self.logger.debug("POSIX: " + msg)
+ entry.set('qtext', "\n".join([entry.get('qtext', ''), msg]))
+ rv = False
+ except OSError:
+ self.logger.debug("POSIX: %s %s does not exist" %
+ (entry.tag, entry.get("name")))
+ entry.set('current_exists', 'false')
+ return False
+
+ return POSIXTool.verify(self, entry, modlist) and rv
+
+ def install(self, entry):
+ ondisk = self._exists(entry, remove=True)
+ if ondisk:
+ self.logger.info("POSIX: Symlink %s cleanup failed" %
+ entry.get('name'))
+ try:
+ os.symlink(entry.get('to'), entry.get('name'))
+ rv = True
+ except OSError:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Failed to create symlink %s to %s: %s" %
+ (entry.get('name'), entry.get('to'), err))
+ rv = False
+ return POSIXTool.install(self, entry) and rv
+
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/__init__.py b/src/lib/Bcfg2/Client/Tools/POSIX/__init__.py
new file mode 100644
index 000000000..7e649a2c1
--- /dev/null
+++ b/src/lib/Bcfg2/Client/Tools/POSIX/__init__.py
@@ -0,0 +1,147 @@
+"""All POSIX Type client support for Bcfg2."""
+
+import os
+import re
+import sys
+import shutil
+import pkgutil
+from datetime import datetime
+import Bcfg2.Client.Tools
+from base import POSIXTool
+
+class POSIX(Bcfg2.Client.Tools.Tool):
+ """POSIX File support code."""
+ name = 'POSIX'
+
+ def __init__(self, logger, setup, config):
+ Bcfg2.Client.Tools.Tool.__init__(self, logger, setup, config)
+ self.ppath = setup['ppath']
+ self.max_copies = setup['max_copies']
+ self._load_handlers()
+ self.logger.debug("POSIX: Handlers loaded: %s" %
+ (", ".join(self._handlers.keys())))
+ self.__req__ = dict(Path=dict())
+ for etype, hdlr in self._handlers.items():
+ self.__req__['Path'][etype] = hdlr.__req__
+ self.__handles__.append(('Path', etype))
+ # Tool.__init__() sets up the list of handled entries, but we
+ # need to do it again after __handles__ has been populated. we
+ # can't populate __handles__ when the class is created because
+ # _load_handlers() _must_ be called at run-time, not at
+ # compile-time.
+ for struct in config:
+ self.handled = [e for e in struct if self.handlesEntry(e)]
+
+ def _load_handlers(self):
+ # this must be called at run-time, not at compile-time, or we
+ # get wierd circular import issues.
+ self._handlers = dict()
+ if hasattr(pkgutil, 'walk_packages'):
+ submodules = pkgutil.walk_packages(path=__path__)
+ else:
+ # python 2.4
+ import glob
+ submodules = []
+ for path in __path__:
+ for submodule in glob.glob(os.path.join(path, "*.py")):
+ mod = os.path.splitext(os.path.basename(submodule))[0]
+ if mod not in ['__init__']:
+ submodules.append((None, mod, True))
+
+ for submodule in submodules:
+ if submodule[1] == 'base':
+ continue
+ module = getattr(__import__("%s.%s" %
+ (__name__,
+ submodule[1])).Client.Tools.POSIX,
+ submodule[1])
+ hdlr = getattr(module, "POSIX" + submodule[1])
+ if POSIXTool in hdlr.__mro__:
+ # figure out what entry type this handler handles
+ etype = hdlr.__name__[5:].lower()
+ self._handlers[etype] = hdlr(self.logger,
+ self.setup,
+ self.config)
+
+ def canVerify(self, entry):
+ if not Bcfg2.Client.Tools.Tool.canVerify(self, entry):
+ return False
+ if not self._handlers[entry.get("type")].fully_specified(entry):
+ self.logger.error('POSIX: Cannot verify incomplete entry %s. '
+ 'Try running bcfg2-lint.' %
+ entry.get('name'))
+ return False
+ return True
+
+ def canInstall(self, entry):
+ """Check if entry is complete for installation."""
+ if not Bcfg2.Client.Tools.Tool.canInstall(self, entry):
+ return False
+ if not self._handlers[entry.get("type")].fully_specified(entry):
+ self.logger.error('POSIX: Cannot install incomplete entry %s. '
+ 'Try running bcfg2-lint.' %
+ entry.get('name'))
+ return False
+ return True
+
+ def InstallPath(self, entry):
+ """Dispatch install to the proper method according to type"""
+ self.logger.debug("POSIX: Installing entry %s:%s:%s" %
+ (entry.tag, entry.get("type"), entry.get("name")))
+ self._paranoid_backup(entry)
+ return self._handlers[entry.get("type")].install(entry)
+
+ def VerifyPath(self, entry, modlist):
+ """Dispatch verify to the proper method according to type"""
+ self.logger.debug("POSIX: Verifying entry %s:%s:%s" %
+ (entry.tag, entry.get("type"), entry.get("name")))
+ ret = self._handlers[entry.get("type")].verify(entry, modlist)
+ if self.setup['interactive'] and not ret:
+ entry.set('qtext',
+ '%s\nInstall %s %s: (y/N) ' %
+ (entry.get('qtext', ''),
+ entry.get('type'), entry.get('name')))
+ return ret
+
+ def _prune_old_backups(self, entry):
+ bkupnam = entry.get('name').replace('/', '_')
+ bkup_re = re.compile(bkupnam + \
+ r'_\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{6}$')
+ # current list of backups for this file
+ try:
+ bkuplist = [f for f in os.listdir(self.ppath) if
+ bkup_re.match(f)]
+ except OSError:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Failed to create backup list in %s: %s" %
+ (self.ppath, err))
+ return
+ bkuplist.sort()
+ while len(bkuplist) >= int(self.max_copies):
+ # remove the oldest backup available
+ oldest = bkuplist.pop(0)
+ self.logger.info("POSIX: Removing old backup %s" % oldest)
+ try:
+ os.remove(os.path.join(self.ppath, oldest))
+ except OSError:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Failed to remove old backup %s: %s" %
+ (os.path.join(self.ppath, oldest), err))
+
+ def _paranoid_backup(self, entry):
+ if (entry.get("paranoid", 'false').lower() == 'true' and
+ self.setup.get("paranoid", False) and
+ entry.get('current_exists', 'true') == 'true' and
+ not os.path.isdir(entry.get("name"))):
+ self._prune_old_backups(entry)
+ bkupnam = "%s_%s" % (entry.get('name').replace('/', '_'),
+ datetime.isoformat(datetime.now()))
+ bfile = os.path.join(self.ppath, bkupnam)
+ try:
+ shutil.copy(entry.get('name'), bfile)
+ self.logger.info("POSIX: Backup of %s saved to %s" %
+ (entry.get('name'), bfile))
+ except IOError:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Failed to create backup file for %s: "
+ "%s" % (entry.get('name'), err))
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/base.py b/src/lib/Bcfg2/Client/Tools/POSIX/base.py
new file mode 100644
index 000000000..1ec1d36d5
--- /dev/null
+++ b/src/lib/Bcfg2/Client/Tools/POSIX/base.py
@@ -0,0 +1,639 @@
+import os
+import sys
+import pwd
+import grp
+import stat
+import shutil
+import Bcfg2.Client.Tools
+import Bcfg2.Client.XML
+
+try:
+ import selinux
+ has_selinux = True
+except ImportError:
+ has_selinux = False
+
+try:
+ import posix1e
+ has_acls = True
+except ImportError:
+ has_acls = False
+
+# map between dev_type attribute and stat constants
+device_map = dict(block=stat.S_IFBLK,
+ char=stat.S_IFCHR,
+ fifo=stat.S_IFIFO)
+
+# map between permissions characters and numeric ACL constants
+acl_map = dict(r=posix1e.ACL_READ,
+ w=posix1e.ACL_WRITE,
+ x=posix1e.ACL_EXECUTE)
+
+class POSIXTool(Bcfg2.Client.Tools.Tool):
+ def fully_specified(self, entry):
+ # checking is done by __req__
+ return True
+
+ def verify(self, entry, modlist):
+ if not self._verify_metadata(entry):
+ return False
+ if entry.get('recursive', 'false').lower() == 'true':
+ # verify ownership information recursively
+ for root, dirs, files in os.walk(entry.get('name')):
+ for p in dirs + files:
+ if not self._verify_metadata(entry,
+ path=os.path.join(root, p)):
+ return False
+ return True
+
+ def install(self, entry):
+ plist = [entry.get('name')]
+ rv = True
+ rv &= self._set_perms(entry)
+ if entry.get('recursive', 'false').lower() == 'true':
+ # set metadata recursively
+ for root, dirs, files in os.walk(entry.get('name')):
+ for path in dirs + files:
+ rv &= self._set_perms(entry, path=os.path.join(root, path))
+ return rv
+
+ def _exists(self, entry, remove=False):
+ try:
+ # check for existing paths and optionally remove them
+ ondisk = os.lstat(entry.get('name'))
+ if remove:
+ if os.path.isdir(entry.get('name')):
+ rm = shutil.rmtree
+ else:
+ rm = os.unlink
+ try:
+ rm(entry.get('name'))
+ return False
+ except OSError:
+ err = sys.exc_info()[1]
+ self.logger.warning('POSIX: Failed to unlink %s: %s' %
+ (entry.get('name'), err))
+ return ondisk # probably still exists
+ else:
+ return ondisk
+ except OSError:
+ return False
+
+ def _set_perms(self, entry, path=None):
+ if path is None:
+ path = entry.get("name")
+
+ rv = True
+ if entry.get("owner") and entry.get("group"):
+ try:
+ self.logger.debug("POSIX: Setting ownership of %s to %s:%s" %
+ (path,
+ self._norm_entry_uid(entry),
+ self._norm_entry_gid(entry)))
+ os.chown(path, self._norm_entry_uid(entry),
+ self._norm_entry_gid(entry))
+ except KeyError:
+ self.logger.error('POSIX: Failed to change ownership of %s' %
+ path)
+ rv = False
+ os.chown(path, 0, 0)
+ except OSError:
+ self.logger.error('POSIX: Failed to change ownership of %s' %
+ path)
+ rv = False
+
+ if entry.get("perms"):
+ configPerms = int(entry.get('perms'), 8)
+ if entry.get('dev_type'):
+ configPerms |= device_map[entry.get('dev_type')]
+ try:
+ self.logger.debug("POSIX: Setting permissions on %s to %s" %
+ (path, oct(configPerms)))
+ os.chmod(path, configPerms)
+ except (OSError, KeyError):
+ self.logger.error('POSIX: Failed to change permissions on %s' %
+ path)
+ rv = False
+
+ if entry.get('mtime'):
+ try:
+ os.utime(entry.get('name'), (int(entry.get('mtime')),
+ int(entry.get('mtime'))))
+ except OSError:
+ self.logger.error("POSIX: Failed to set mtime of %s" % path)
+ rv = False
+
+ rv &= self._set_secontext(entry, path=path)
+ rv &= self._set_acls(entry, path=path)
+ return rv
+
+
+ def _set_acls(self, entry, path=None):
+ """ set POSIX ACLs on the file on disk according to the config """
+ if not has_acls:
+ if entry.findall("ACL"):
+ self.logger.debug("POSIX: ACLs listed for %s but no pylibacl "
+ "library installed" % entry.get('name'))
+ return True
+
+ if path is None:
+ path = entry.get("name")
+
+ try:
+ acl = posix1e.ACL(file=path)
+ except IOError:
+ err = sys.exc_info()[1]
+ if err.errno == 95:
+ # fs is mounted noacl
+ self.logger.error("POSIX: Cannot set ACLs on filesystem "
+ "mounted without ACL support: %s" % path)
+ else:
+ self.logger.error("POSIX: Error getting current ACLS on %s: %s"
+ % (path, err))
+ return False
+ # clear ACLs out so we start fresh -- way easier than trying
+ # to add/remove/modify ACLs
+ for aclentry in acl:
+ if aclentry.tag_type in [posix1e.ACL_USER, posix1e.ACL_GROUP]:
+ acl.delete_entry(aclentry)
+ if os.path.isdir(path):
+ defacl = posix1e.ACL(filedef=path)
+ if not defacl.valid():
+ # when a default ACL is queried on a directory that
+ # has no default ACL entries at all, you get an empty
+ # ACL, which is not valid. in this circumstance, we
+ # just copy the access ACL to get a base valid ACL
+ # that we can add things to.
+ defacl = posix1e.ACL(acl=acl)
+ else:
+ for aclentry in defacl:
+ if aclentry.tag_type in [posix1e.ACL_USER,
+ posix1e.ACL_GROUP]:
+ defacl.delete_entry(aclentry)
+ else:
+ defacl = None
+
+ for aclkey, perms in self._list_entry_acls(entry).items():
+ atype, scope, qualifier = aclkey
+ if atype == "default":
+ if defacl is None:
+ self.logger.warning("POSIX: Cannot set default ACLs on "
+ "non-directory %s" % path)
+ continue
+ entry = posix1e.Entry(defacl)
+ else:
+ entry = posix1e.Entry(acl)
+ for perm in acl_map.values():
+ if perm & perms:
+ entry.permset.add(perm)
+ entry.tag_type = scope
+ try:
+ if scope == posix1e.ACL_USER:
+ scopename = "user"
+ entry.qualifier = self._norm_uid(qualifier)
+ elif scope == posix1e.ACL_GROUP:
+ scopename = "group"
+ entry.qualifier = self._norm_gid(qualifier)
+ except (OSError, KeyError):
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Could not resolve %s %s: %s" %
+ (scopename, qualifier, err))
+ continue
+ acl.calc_mask()
+
+ def _apply_acl(acl, path, atype=posix1e.ACL_TYPE_ACCESS):
+ if atype == posix1e.ACL_TYPE_ACCESS:
+ atype_str = "access"
+ else:
+ atype_str = "default"
+ if acl.valid():
+ self.logger.debug("POSIX: Applying %s ACL to %s:" % (atype_str,
+ path))
+ for line in str(acl).splitlines():
+ self.logger.debug(" " + line)
+ try:
+ acl.applyto(path, atype)
+ return True
+ except:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Failed to set ACLs on %s: %s" %
+ (path, err))
+ return False
+ else:
+ self.logger.warning("POSIX: %s ACL created for %s was invalid:"
+ % (atype_str.title(), path))
+ for line in str(acl).splitlines():
+ self.logger.warning(" " + line)
+ return False
+
+ rv = _apply_acl(acl, path)
+ if defacl:
+ defacl.calc_mask()
+ rv &= _apply_acl(defacl, path, posix1e.ACL_TYPE_DEFAULT)
+ return rv
+
+ def _set_secontext(self, entry, path=None):
+ """ set the SELinux context of the file on disk according to the
+ config"""
+ if not has_selinux:
+ return True
+
+ if path is None:
+ path = entry.get("name")
+ context = entry.get("secontext")
+ if context is None:
+ # no context listed
+ return True
+
+ if context == '__default__':
+ try:
+ selinux.restorecon(path)
+ rv = True
+ except:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Failed to restore SELinux context "
+ "for %s: %s" % (path, err))
+ rv = False
+ else:
+ try:
+ rv = selinux.lsetfilecon(path, context) == 0
+ except:
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Failed to restore SELinux context "
+ "for %s: %s" % (path, err))
+ rv = False
+ return rv
+
+ def _norm_gid(self, gid):
+ """ This takes a group name or gid and returns the
+ corresponding gid. """
+ try:
+ return int(gid)
+ except ValueError:
+ return int(grp.getgrnam(gid)[2])
+
+ def _norm_entry_gid(self, entry):
+ try:
+ return self._norm_gid(entry.get('group'))
+ except (OSError, KeyError):
+ err = sys.exc_info()[1]
+ self.logger.error('POSIX: GID normalization failed for %s on %s: %s'
+ % (entry.get('group'), entry.get('name'), err))
+ return 0
+
+ def _norm_uid(self, uid):
+ """ This takes a username or uid and returns the
+ corresponding uid. """
+ try:
+ return int(uid)
+ except ValueError:
+ return int(pwd.getpwnam(uid)[2])
+
+ def _norm_entry_uid(self, entry):
+ try:
+ return self._norm_uid(entry.get("owner"))
+ except (OSError, KeyError):
+ err = sys.exc_info()[1]
+ self.logger.error('POSIX: UID normalization failed for %s on %s: %s'
+ % (entry.get('owner'), entry.get('name'), err))
+ return 0
+
+ def _norm_acl_perms(self, perms):
+ """ takes a representation of an ACL permset and returns a digit
+ representing the permissions entailed by it. representations can
+ either be a single octal digit, a string of up to three 'r',
+ 'w', 'x', or '-' characters, or a posix1e.Permset object"""
+ if hasattr(perms, 'test'):
+ # Permset object
+ return sum([p for p in acl_map.values()
+ if perms.test(p)])
+
+ try:
+ # single octal digit
+ rv = int(perms)
+ if rv > 0 and rv < 8:
+ return rv
+ else:
+ self.logger.error("POSIX: Permissions digit out of range in "
+ "ACL: %s" % perms)
+ return 0
+ except ValueError:
+ # couldn't be converted to an int; process as a string
+ if len(perms) > 3:
+ self.logger.error("POSIX: Permissions string too long in ACL: "
+ "%s" % perms)
+ return 0
+ rv = 0
+ for char in perms:
+ if char == '-':
+ continue
+ elif char not in acl_map:
+ self.logger.warning("POSIX: Unknown permissions character "
+ "in ACL: %s" % char)
+ elif rv & acl_map[char]:
+ self.logger.warning("POSIX: Duplicate permissions "
+ "character in ACL: %s" % perms)
+ else:
+ rv |= acl_map[char]
+ return rv
+
+ def _acl2string(self, aclkey, perms):
+ atype, scope, qualifier = aclkey
+ acl_str = []
+ if atype == 'default':
+ acl_str.append(atype)
+ if scope == posix1e.ACL_USER:
+ acl_str.append("user")
+ elif scope == posix1e.ACL_GROUP:
+ acl_str.append("group")
+ acl_str.append(qualifier)
+ acl_str.append(self._acl_perm2string(perms))
+ return ":".join(acl_str)
+
+ def _acl_perm2string(self, perm):
+ rv = []
+ for char in 'rwx':
+ if acl_map[char] & perm:
+ rv.append(char)
+ else:
+ rv.append('-')
+ return ''.join(rv)
+
+ def _gather_data(self, path):
+ try:
+ ondisk = os.stat(path)
+ except OSError:
+ self.logger.debug("POSIX: %s does not exist" % path)
+ return (False, None, None, None, None, None)
+
+ try:
+ owner = str(ondisk[stat.ST_UID])
+ except OSError:
+ err = sys.exc_info()[1]
+ self.logger.debug("POSIX: Could not get current owner of %s: %s" %
+ (path, err))
+ owner = None
+ except KeyError:
+ self.logger.error('POSIX: User resolution failed for %s' % path)
+ owner = None
+
+ try:
+ group = str(ondisk[stat.ST_GID])
+ except (OSError, KeyError):
+ err = sys.exc_info()[1]
+ self.logger.debug("POSIX: Could not get current group of %s: %s" %
+ (path, err))
+ group = None
+ except KeyError:
+ self.logger.error('POSIX: Group resolution failed for %s' % path)
+ group = None
+
+ try:
+ perms = oct(ondisk[stat.ST_MODE])[-4:]
+ except (OSError, KeyError, TypeError):
+ err = sys.exc_info()[1]
+ self.logger.debug("POSIX: Could not get current permissions of %s: "
+ "%s" % (path, err))
+ perms = None
+
+ if has_selinux:
+ try:
+ secontext = selinux.getfilecon(path)[1].split(":")[2]
+ except (OSError, KeyError):
+ err = sys.exc_info()[1]
+ self.logger.debug("POSIX: Could not get current SELinux "
+ "context of %s: %s" % (path, err))
+ secontext = None
+ else:
+ secontext = None
+
+ if has_acls:
+ acls = self._list_file_acls(path)
+ else:
+ acls = None
+ return (ondisk, owner, group, perms, secontext, acls)
+
+ def _verify_metadata(self, entry, path=None):
+ """ generic method to verify perms, owner, group, secontext, acls,
+ and mtime """
+ # allow setting an alternate path for recursive permissions checking
+ if path is None:
+ path = entry.get('name')
+ attrib = dict()
+ ondisk, attrib['current_owner'], attrib['current_group'], \
+ attrib['current_perms'], attrib['current_secontext'], acls = \
+ self._gather_data(path)
+
+ if not ondisk:
+ entry.set('current_exists', 'false')
+ return False
+
+ # we conditionally verify every bit of metadata only if it's
+ # specified on the entry. consequently, canVerify() and
+ # fully_specified() are preconditions of _verify_metadata(),
+ # since they will ensure that everything that needs to be
+ # specified actually is. this lets us gracefully handle
+ # symlink and hardlink entries, which have SELinux contexts
+ # but not other permissions, optional secontext and mtime
+ # attrs, and so on.
+ configOwner, configGroup, configPerms, mtime = None, None, None, -1
+ if entry.get('mtime', '-1') != '-1':
+ mtime = str(ondisk[stat.ST_MTIME])
+ if entry.get("owner"):
+ configOwner = str(self._norm_entry_uid(entry))
+ if entry.get("group"):
+ configGroup = str(self._norm_entry_gid(entry))
+ if entry.get("perms"):
+ while len(entry.get('perms', '')) < 4:
+ entry.set('perms', '0' + entry.get('perms', ''))
+ configPerms = int(entry.get('perms'), 8)
+
+ errors = []
+ if configOwner and attrib['current_owner'] != configOwner:
+ errors.append("Owner for path %s is incorrect. "
+ "Current owner is %s but should be %s" %
+ (path, attrib['current_owner'], entry.get('owner')))
+
+ if configGroup and attrib['current_group'] != configGroup:
+ errors.append("Group for path %s is incorrect. "
+ "Current group is %s but should be %s" %
+ (path, attrib['current_group'], entry.get('group')))
+
+ if (configPerms and
+ oct(int(attrib['current_perms'], 8)) != oct(configPerms)):
+ errors.append("Permissions for path %s are incorrect. "
+ "Current permissions are %s but should be %s" %
+ (path, attrib['current_perms'], entry.get('perms')))
+
+ if entry.get('mtime'):
+ attrib['current_mtime'] = mtime
+ if mtime != entry.get('mtime', '-1'):
+ errors.append("mtime for path %s is incorrect. "
+ "Current mtime is %s but should be %s" %
+ (path, mtime, entry.get('mtime')))
+
+ if has_selinux and entry.get("secontext"):
+ if entry.get("secontext") == "__default__":
+ configContext = selinux.matchpathcon(path, 0)[1].split(":")[2]
+ else:
+ configContext = entry.get("secontext")
+ if attrib['current_secontext'] != configContext:
+ errors.append("SELinux context for path %s is incorrect. "
+ "Current context is %s but should be %s" %
+ (path, attrib['current_secontext'],
+ configContext))
+
+ if errors:
+ for error in errors:
+ self.logger.debug("POSIX: " + error)
+ entry.set('qtext', "\n".join([entry.get('qtext', '')] + errors))
+ if path == entry.get("name"):
+ for attr, val in attrib.items():
+ entry.set(attr, val)
+
+ aclVerifies = self._verify_acls(entry, path=path)
+ return aclVerifies and len(errors) == 0
+
+ def _list_entry_acls(self, entry):
+ wanted = dict()
+ for acl in entry.findall("ACL"):
+ if acl.get("scope") == "user":
+ scope = posix1e.ACL_USER
+ elif acl.get("scope") == "group":
+ scope = posix1e.ACL_GROUP
+ else:
+ self.logger.error("POSIX: Unknown ACL scope %s" %
+ acl.get("scope"))
+ continue
+ wanted[(acl.get("type"), scope, acl.get(acl.get("scope")))] = \
+ self._norm_acl_perms(acl.get('perms'))
+ return wanted
+
+ def _list_file_acls(self, path):
+ def _process_acl(acl, atype):
+ try:
+ if acl.tag_type == posix1e.ACL_USER:
+ qual = pwd.getpwuid(acl.qualifier)[0]
+ elif acl.tag_type == posix1e.ACL_GROUP:
+ qual = grp.getgrgid(acl.qualifier)[0]
+ else:
+ return
+ except (OSError, KeyError):
+ err = sys.exc_info()[1]
+ self.logger.error("POSIX: Lookup of %s %s failed: %s" %
+ (scope, acl.qualifier, err))
+ qual = acl.qualifier
+ existing[(atype, acl.tag_type, qual)] = \
+ self._norm_acl_perms(acl.permset)
+
+ existing = dict()
+ try:
+ for acl in posix1e.ACL(file=path):
+ _process_acl(acl, "access")
+ except IOError:
+ err = sys.exc_info()[1]
+ if err.errno == 95:
+ # fs is mounted noacl
+ self.logger.debug("POSIX: Filesystem mounted without ACL "
+ "support: %s" % path)
+ else:
+ self.logger.error("POSIX: Error getting current ACLS on %s: %s"
+ % (path, err))
+ return existing
+
+ if os.path.isdir(path):
+ for acl in posix1e.ACL(filedef=path):
+ _process_acl(acl, "default")
+ return existing
+
+ def _verify_acls(self, entry, path=None):
+ if not has_acls:
+ if entry.findall("ACL"):
+ self.logger.debug("POSIX: ACLs listed for %s but no pylibacl "
+ "library installed" % entry.get('name'))
+ return True
+
+ if path is None:
+ path = entry.get("name")
+
+ # create lists of normalized representations of the ACLs we want
+ # and the ACLs we have. this will make them easier to compare
+ # than trying to mine that data out of the ACL objects and XML
+ # objects and compare it at the same time.
+ wanted = self._list_entry_acls(entry)
+ existing = self._list_file_acls(path)
+
+ missing = []
+ extra = []
+ wrong = []
+ for aclkey, perms in wanted.items():
+ if aclkey not in existing:
+ missing.append(self._acl2string(aclkey, perms))
+ elif existing[aclkey] != perms:
+ wrong.append((self._acl2string(aclkey, perms),
+ self._acl2string(aclkey, existing[aclkey])))
+ if path == entry.get("name"):
+ atype, scope, qual = aclkey
+ aclentry = Bcfg2.Client.XML.Element("ACL", type=atype,
+ perms=str(perms))
+ if scope == posix1e.ACL_USER:
+ aclentry.set("scope", "user")
+ elif scope == posix1e.ACL_GROUP:
+ aclentry.set("scope", "group")
+ else:
+ self.logger.debug("POSIX: Unknown ACL scope %s on %s" %
+ (scope, path))
+ continue
+ aclentry.set(aclentry.get("scope"), qual)
+ entry.append(aclentry)
+
+ for aclkey, perms in existing.items():
+ if aclkey not in wanted:
+ extra.append(self._acl2string(aclkey, perms))
+
+ msg = []
+ if missing:
+ msg.append("%s ACLs are missing: %s" % (len(missing),
+ ", ".join(missing)))
+ if wrong:
+ msg.append("%s ACLs are wrong: %s" %
+ (len(wrong),
+ "; ".join(["%s should be %s" % (e, w)
+ for w, e in wrong])))
+ if extra:
+ msg.append("%s extra ACLs: %s" % (len(extra), ", ".join(extra)))
+
+ if msg:
+ msg.insert(0, "POSIX: ACLs for %s are incorrect." % path)
+ self.logger.debug(msg[0])
+ for line in msg[1:]:
+ self.logger.debug(" " + line)
+ entry.set('qtext', "\n".join([entry.get("qtext", '')] + msg))
+ return False
+ return True
+
+ def _makedirs(self, entry, path=None):
+ """ os.makedirs helpfully creates all parent directories for
+ us, but it sets permissions according to umask, which is
+ probably wrong. we need to find out which directories were
+ created and set permissions on those
+ (http://trac.mcs.anl.gov/projects/bcfg2/ticket/1125) """
+ created = []
+ if path is None:
+ path = entry.get("name")
+ cur = path
+ while cur != '/':
+ if not os.path.exists(cur):
+ created.append(cur)
+ cur = os.path.dirname(cur)
+ rv = True
+ try:
+ os.makedirs(path)
+ except OSError:
+ err = sys.exc_info()[1]
+ self.logger.error('POSIX: Failed to create directory %s: %s' %
+ (path, err))
+ rv = False
+ for cpath in created:
+ rv &= self._set_perms(entry, path=cpath)
+ return rv
diff --git a/src/lib/Bcfg2/Client/Tools/__init__.py b/src/lib/Bcfg2/Client/Tools/__init__.py
index e4a0ec220..1f191fce3 100644
--- a/src/lib/Bcfg2/Client/Tools/__init__.py
+++ b/src/lib/Bcfg2/Client/Tools/__init__.py
@@ -1,17 +1,27 @@
"""This contains all Bcfg2 Tool modules"""
import os
-import stat
import sys
-from subprocess import Popen, PIPE
+import stat
import time
+import pkgutil
+from subprocess import Popen, PIPE
import Bcfg2.Client.XML
from Bcfg2.Bcfg2Py3k import input
-__all__ = [tool.split('.')[0] \
- for tool in os.listdir(os.path.dirname(__file__)) \
- if tool.endswith(".py") and tool != "__init__.py"]
-
+if hasattr(pkgutil, 'walk_packages'):
+ submodules = pkgutil.walk_packages(path=__path__)
+else:
+ # python 2.4
+ import glob
+ submodules = []
+ for path in __path__:
+ for submodule in glob.glob(os.path.join(path, "*.py")):
+ mod = os.path.splitext(os.path.basename(submodule))[0]
+ if mod not in ['__init__']:
+ submodules.append((None, mod, True))
+
+__all__ = [m[1] for m in submodules]
drivers = [item for item in __all__ if item not in ['rpmtools']]
default = [item for item in drivers if item not in ['RPM', 'Yum']]
@@ -37,7 +47,7 @@ class executor:
return (p.returncode, output.splitlines())
-class Tool:
+class Tool(object):
"""
All tools subclass this. It defines all interfaces that need to be defined.
"""
@@ -48,10 +58,6 @@ class Tool:
__important__ = []
def __init__(self, logger, setup, config):
- self.__important__ = [entry.get('name') \
- for struct in config for entry in struct \
- if entry.tag == 'Path' and \
- entry.get('important') in ['true', 'True']]
self.setup = setup
self.logger = logger
if not hasattr(self, '__ireq__'):
@@ -60,8 +66,15 @@ class Tool:
self.cmd = executor(logger)
self.modified = []
self.extra = []
- self.handled = [entry for struct in self.config for entry in struct \
- if self.handlesEntry(entry)]
+ self.__important__ = []
+ self.handled = []
+ for struct in config:
+ for entry in struct:
+ if (entry.tag == 'Path' and
+ entry.get('important', 'false').lower() == 'true'):
+ self.__important__.append(entry.get('name'))
+ if self.handlesEntry(entry):
+ self.handled.append(entry)
for filename in self.__execs__:
try:
mode = stat.S_IMODE(os.stat(filename)[stat.ST_MODE])
@@ -131,7 +144,7 @@ class Tool:
'''Build a list of potentially modified POSIX paths for this entry'''
return [entry.get('name') for struct in self.config.getchildren() \
for entry in struct.getchildren() \
- if entry.tag in ['Ignore', 'Path']]
+ if entry.tag == 'Path']
def gatherCurrentData(self, entry):
"""Default implementation of the information gathering routines."""
@@ -163,10 +176,10 @@ class Tool:
missing = self.missing_attrs(entry)
if missing:
- self.logger.error("Incomplete information for entry %s:%s; cannot verify" \
- % (entry.tag, entry.get('name')))
- self.logger.error("\t... due to absence of %s attribute(s)" % \
- (":".join(missing)))
+ self.logger.error("Cannot verify entry %s:%s due to missing "
+ "required attribute(s): %s" %
+ (entry.tag, entry.get('name'),
+ ", ".join(missing)))
try:
self.gatherCurrentData(entry)
except:
diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py
index e768284b9..320a219d3 100644
--- a/src/lib/Bcfg2/Options.py
+++ b/src/lib/Bcfg2/Options.py
@@ -933,6 +933,8 @@ CLIENT_COMMON_OPTIONS = \
drivers=CLIENT_DRIVERS,
dryrun=CLIENT_DRYRUN,
paranoid=CLIENT_PARANOID,
+ ppath=PARANOID_PATH,
+ max_copies=PARANOID_MAX_COPIES,
bundle=CLIENT_BUNDLE,
skipbundle=CLIENT_SKIPBUNDLE,
bundle_quick=CLIENT_BUNDLEQUICK,
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py
new file mode 100644
index 000000000..7d64c5a2e
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDevice.py
@@ -0,0 +1,139 @@
+import os
+import copy
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Device import *
+from Test__init import get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_device_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXDevice(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXDevice(unittest.TestCase):
+ def test_fully_specified(self):
+ ptool = get_device_object()
+ orig_entry = lxml.etree.Element("Path", name="/test", type="device",
+ dev_type="fifo")
+ self.assertTrue(ptool.fully_specified(orig_entry))
+ for dtype in ["block", "char"]:
+ for attr in ["major", "minor"]:
+ entry = copy.deepcopy(orig_entry)
+ entry.set("dev_type", dtype)
+ entry.set(attr, "0")
+ self.assertFalse(ptool.fully_specified(entry))
+ entry = copy.deepcopy(orig_entry)
+ entry.set("dev_type", dtype)
+ entry.set("major", "0")
+ entry.set("minor", "0")
+ self.assertTrue(ptool.fully_specified(entry))
+
+ @patch("os.major")
+ @patch("os.minor")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._exists")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ def test_verify(self, mock_verify, mock_exists, mock_minor, mock_major):
+ entry = lxml.etree.Element("Path", name="/test", type="device",
+ perms='0644', owner='root', group='root',
+ dev_type="block", major="0", minor="10")
+ ptool = get_device_object()
+
+ def reset():
+ mock_exists.reset_mock()
+ mock_verify.reset_mock()
+ mock_minor.reset_mock()
+ mock_major.reset_mock()
+
+ mock_exists.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+
+ reset()
+ mock_exists.return_value = MagicMock()
+ mock_major.return_value = 0
+ mock_minor.return_value = 10
+ mock_verify.return_value = True
+ self.assertTrue(ptool.verify(entry, []))
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_exists.assert_called_with(entry)
+ mock_major.assert_called_with(mock_exists.return_value.st_rdev)
+ mock_minor.assert_called_with(mock_exists.return_value.st_rdev)
+
+ reset()
+ mock_exists.return_value = MagicMock()
+ mock_major.return_value = 0
+ mock_minor.return_value = 10
+ mock_verify.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_exists.assert_called_with(entry)
+ mock_major.assert_called_with(mock_exists.return_value.st_rdev)
+ mock_minor.assert_called_with(mock_exists.return_value.st_rdev)
+
+ reset()
+ mock_verify.return_value = True
+ entry = lxml.etree.Element("Path", name="/test", type="device",
+ perms='0644', owner='root', group='root',
+ dev_type="fifo")
+ self.assertTrue(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ self.assertFalse(mock_major.called)
+ self.assertFalse(mock_minor.called)
+
+ @patch("os.makedev")
+ @patch("os.mknod")
+ @patch("Bcfg2.Client.Tools.POSIX.Device.POSIXDevice._exists")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ def test_install(self, mock_install, mock_exists, mock_mknod, mock_makedev):
+ entry = lxml.etree.Element("Path", name="/test", type="device",
+ perms='0644', owner='root', group='root',
+ dev_type="block", major="0", minor="10")
+ ptool = get_device_object()
+
+ mock_exists.return_value = False
+ mock_makedev.return_value = Mock()
+ mock_install.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_makedev.assert_called_with(0, 10)
+ mock_mknod.assert_called_with(entry.get("name"),
+ device_map[entry.get("dev_type")] | 0644,
+ mock_makedev.return_value)
+ mock_install.assert_called_with(ptool, entry)
+
+ mock_makedev.reset_mock()
+ mock_mknod.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_makedev.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+
+ mock_makedev.reset_mock()
+ mock_mknod.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_mknod.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+
+ mock_makedev.reset_mock()
+ mock_mknod.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_mknod.side_effect = None
+ entry = lxml.etree.Element("Path", name="/test", type="device",
+ perms='0644', owner='root', group='root',
+ dev_type="fifo")
+
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_mknod.assert_called_with(entry.get("name"),
+ device_map[entry.get("dev_type")] | 0644)
+ mock_install.assert_called_with(ptool, entry)
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py
new file mode 100644
index 000000000..021ed8113
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestDirectory.py
@@ -0,0 +1,154 @@
+import os
+import stat
+import copy
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Directory import *
+from Test__init import get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_directory_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXDirectory(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXDirectory(unittest.TestCase):
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ @patch("Bcfg2.Client.Tools.POSIX.Directory.POSIXDirectory._exists")
+ def test_verify(self, mock_exists, mock_verify):
+ entry = lxml.etree.Element("Path", name="/test", type="directory",
+ perms='0644', owner='root', group='root')
+ ptool = get_directory_object()
+
+ mock_exists.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+
+ mock_exists.reset_mock()
+ exists_rv = MagicMock()
+ exists_rv.__getitem__.return_value = stat.S_IFREG | 0644
+ mock_exists.return_value = exists_rv
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+
+ mock_exists.reset_mock()
+ mock_verify.return_value = False
+ exists_rv.__getitem__.return_value = stat.S_IFDIR | 0644
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_exists.reset_mock()
+ mock_verify.reset_mock()
+ mock_verify.return_value = True
+ self.assertTrue(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ with patch("os.listdir") as mock_listdir:
+ mock_exists.reset_mock()
+ mock_verify.reset_mock()
+ entry.set("prune", "true")
+ orig_entry = copy.deepcopy(entry)
+
+ entries = ["foo", "bar", "bar/baz"]
+ mock_listdir.return_value = entries
+ modlist = [os.path.join(entry.get("name"), entries[0])]
+ self.assertFalse(ptool.verify(entry, modlist))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, modlist)
+ mock_listdir.assert_called_with(entry.get("name"))
+ expected = [os.path.join(entry.get("name"), e)
+ for e in entries
+ if os.path.join(entry.get("name"), e) not in modlist]
+ actual = [e.get("path") for e in entry.findall("Prune")]
+ self.assertItemsEqual(expected, actual)
+
+ mock_verify.reset_mock()
+ mock_exists.reset_mock()
+ mock_listdir.reset_mock()
+ entry = copy.deepcopy(orig_entry)
+ modlist = [os.path.join(entry.get("name"), e)
+ for e in entries]
+ self.assertTrue(ptool.verify(entry, modlist))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, modlist)
+ mock_listdir.assert_called_with(entry.get("name"))
+ self.assertEqual(len(entry.findall("Prune")), 0)
+
+ @patch("os.unlink")
+ @patch("shutil.rmtree")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ @patch("Bcfg2.Client.Tools.POSIX.Directory.POSIXDirectory._exists")
+ @patch("Bcfg2.Client.Tools.POSIX.Directory.POSIXDirectory._makedirs")
+ def test_install(self, mock_makedirs, mock_exists, mock_install,
+ mock_rmtree, mock_unlink):
+ entry = lxml.etree.Element("Path", name="/test/foo/bar",
+ type="directory", perms='0644',
+ owner='root', group='root')
+ ptool = get_directory_object()
+
+ def reset():
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rmtree.reset_mock()
+ mock_rmtree.mock_makedirs()
+
+ mock_makedirs.return_value = True
+ mock_exists.return_value = False
+ mock_install.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry)
+ mock_install.assert_called_with(ptool, entry)
+ mock_makedirs.assert_called_with(entry)
+
+ reset()
+ exists_rv = MagicMock()
+ exists_rv.__getitem__.return_value = stat.S_IFREG | 0644
+ mock_exists.return_value = exists_rv
+ self.assertTrue(ptool.install(entry))
+ mock_unlink.assert_called_with(entry.get("name"))
+ mock_exists.assert_called_with(entry)
+ mock_makedirs.assert_called_with(entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ reset()
+ exists_rv.__getitem__.return_value = stat.S_IFDIR | 0644
+ mock_install.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ reset()
+ mock_install.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_install.assert_called_with(ptool, entry)
+
+ entry.set("prune", "true")
+ prune = ["/test/foo/bar/prune1", "/test/foo/bar/prune2"]
+ for path in prune:
+ lxml.etree.SubElement(entry, "Prune", path=path)
+
+ reset()
+ mock_install.return_value = True
+ with patch("os.path.isdir") as mock_isdir:
+ def isdir_rv(path):
+ if path.endswith("prune2"):
+ return True
+ else:
+ return False
+ mock_isdir.side_effect = isdir_rv
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry)
+ mock_install.assert_called_with(ptool, entry)
+ self.assertItemsEqual(mock_isdir.call_args_list,
+ [call(p) for p in prune])
+ mock_unlink.assert_called_with("/test/foo/bar/prune1")
+ mock_rmtree.assert_called_with("/test/foo/bar/prune2")
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py
new file mode 100644
index 000000000..a2cd52dd5
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestFile.py
@@ -0,0 +1,318 @@
+import os
+import copy
+import binascii
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.File import *
+from Test__init import get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_file_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXFile(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXFile(unittest.TestCase):
+ def test_fully_specified(self):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ ptool = get_file_object()
+ self.assertFalse(ptool.fully_specified(entry))
+
+ entry.set("empty", "true")
+ self.assertTrue(ptool.fully_specified(entry))
+
+ entry.set("empty", "false")
+ entry.text = "text"
+ self.assertTrue(ptool.fully_specified(entry))
+
+ def test_is_string(self):
+ ptool = get_file_object()
+ for char in range(8) + range(14, 32):
+ self.assertFalse(ptool._is_string("foo" + chr(char) + "bar",
+ 'utf_8'))
+ for char in range(9, 14) + range(33, 128):
+ self.assertTrue(ptool._is_string("foo" + chr(char) + "bar",
+ 'utf_8'))
+ self.assertFalse(ptool._is_string("foo" + chr(128) + "bar",
+ 'ascii'))
+ ustr = '\xef\xa3\x91 + \xef\xa3\x92'
+ self.assertTrue(ptool._is_string(ustr, 'utf_8'))
+ self.assertFalse(ptool._is_string(ustr, 'ascii'))
+
+ def test_get_data(self):
+ orig_entry = lxml.etree.Element("Path", name="/test", type="file")
+ setup = dict(encoding="ascii", ppath='/', max_copies=5)
+ ptool = get_file_object(posix=get_posix_object(setup=setup))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.text = binascii.b2a_base64("test")
+ entry.set("encoding", "base64")
+ self.assertEqual(ptool._get_data(entry), ("test", True))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.set("empty", "true")
+ self.assertEqual(ptool._get_data(entry), ("", False))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.text = "test"
+ self.assertEqual(ptool._get_data(entry), ("test", False))
+
+ ustr = u'\uf8d1 + \uf8d2'
+ entry = copy.deepcopy(orig_entry)
+ entry.text = ustr
+ self.assertEqual(ptool._get_data(entry), (ustr, False))
+
+ setup['encoding'] = "utf_8"
+ ptool = get_file_object(posix=get_posix_object(setup=setup))
+ entry = copy.deepcopy(orig_entry)
+ entry.text = ustr
+ self.assertEqual(ptool._get_data(entry),
+ ('\xef\xa3\x91 + \xef\xa3\x92', False))
+
+ @patch("__builtin__.open")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._exists")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_data")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_diffs")
+ def test_verify(self, mock_get_diffs, mock_get_data, mock_exists,
+ mock_verify, mock_open):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ setup = dict(interactive=False, ppath='/', max_copies=5)
+ ptool = get_file_object(posix=get_posix_object(setup=setup))
+
+ def reset():
+ mock_get_diffs.reset_mock()
+ mock_get_data.reset_mock()
+ mock_exists.reset_mock()
+ mock_verify.reset_mock()
+ mock_open.reset_mock()
+
+ mock_get_data.return_value = ("test", False)
+ mock_exists.return_value = False
+ mock_verify.return_value = True
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_get_diffs.assert_called_with(entry, interactive=False,
+ sensitive=False,
+ is_binary=False,
+ content="")
+
+ reset()
+ exists_rv = MagicMock()
+ exists_rv.__getitem__.return_value = 5
+ mock_exists.return_value = exists_rv
+ mock_get_data.return_value = ("test", True)
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_get_diffs.assert_called_with(entry, interactive=False,
+ sensitive=False,
+ is_binary=True,
+ content=None)
+
+ reset()
+ mock_get_data.return_value = ("test", False)
+ exists_rv.__getitem__.return_value = 4
+ entry.set("sensitive", "true")
+ open_rv = Mock()
+ open_rv.read.return_value = "tart"
+ mock_open.return_value = open_rv
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_open.assert_called_with(entry.get("name"))
+ open_rv.assert_any_call()
+ mock_get_diffs.assert_called_with(entry, interactive=False,
+ sensitive=True,
+ is_binary=False,
+ content="tart")
+
+ reset()
+ open_rv.read.return_value = "test"
+ mock_open.return_value = open_rv
+ self.assertTrue(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_verify.assert_called_with(ptool, entry, [])
+ mock_open.assert_called_with(entry.get("name"))
+ open_rv.assert_any_call()
+ self.assertFalse(mock_get_diffs.called)
+
+ reset()
+ mock_open.side_effect = IOError
+ self.assertFalse(ptool.verify(entry, []))
+ mock_exists.assert_called_with(entry)
+ mock_open.assert_called_with(entry.get("name"))
+
+ @patch("os.fdopen")
+ @patch("tempfile.mkstemp")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._get_data")
+ def test_write_tmpfile(self, mock_get_data, mock_mkstemp, mock_fdopen):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ perms='0644', owner='root', group='root')
+ ptool = get_file_object()
+ newfile = "/foo/bar"
+
+ def reset():
+ mock_get_data.reset_mock()
+ mock_mkstemp.reset_mock()
+ mock_fdopen.reset_mock()
+
+ mock_get_data.return_value = ("test", False)
+ mock_mkstemp.return_value = (5, newfile)
+ self.assertEqual(ptool._write_tmpfile(entry), newfile)
+ mock_get_data.assert_called_with(entry)
+ mock_mkstemp.assert_called_with(prefix='test', dir='/')
+ mock_fdopen.assert_called_with(5, 'w')
+ mock_fdopen.return_value.write.assert_called_with("test")
+
+ reset()
+ mock_mkstemp.side_effect = OSError
+ self.assertFalse(ptool._write_tmpfile(entry))
+ mock_mkstemp.assert_called_with(prefix='test', dir='/')
+
+ reset()
+ mock_mkstemp.side_effect = None
+ mock_fdopen.side_effect = OSError
+ self.assertFalse(ptool._write_tmpfile(entry))
+ mock_mkstemp.assert_called_with(prefix='test', dir='/')
+ mock_get_data.assert_called_with(entry)
+ mock_fdopen.assert_called_with(5, 'w')
+
+ @patch("os.rename")
+ @patch("os.unlink")
+ def test_rename_tmpfile(self, mock_unlink, mock_rename):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ perms='0644', owner='root', group='root')
+ ptool = get_file_object()
+ newfile = "/foo/bar"
+
+ self.assertTrue(ptool._rename_tmpfile(newfile, entry))
+ mock_rename.assert_called_with(newfile, entry.get("name"))
+
+ mock_rename.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rename.side_effect = OSError
+ self.assertFalse(ptool._rename_tmpfile(newfile, entry))
+ mock_rename.assert_called_with(newfile, entry.get("name"))
+ mock_unlink.assert_called_with(newfile)
+
+ # even if the unlink fails, return false gracefully
+ mock_rename.reset_mock()
+ mock_unlink.reset_mock()
+ mock_unlink.side_effect = OSError
+ self.assertFalse(ptool._rename_tmpfile(newfile, entry))
+ mock_rename.assert_called_with(newfile, entry.get("name"))
+ mock_unlink.assert_called_with(newfile)
+
+ @patch("os.path.exists")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._makedirs")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._set_perms")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._write_tmpfile")
+ @patch("Bcfg2.Client.Tools.POSIX.File.POSIXFile._rename_tmpfile")
+ def test_install(self, mock_rename, mock_write, mock_set_perms,
+ mock_makedirs, mock_install, mock_exists):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ perms='0644', owner='root', group='root')
+ ptool = get_file_object()
+
+ def reset():
+ mock_rename.reset_mock()
+ mock_write.reset_mock()
+ mock_set_perms.reset_mock()
+ mock_makedirs.reset_mock()
+ mock_install.reset_mock()
+ mock_exists.reset_mock()
+
+ mock_exists.return_value = False
+ mock_makedirs.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+
+ reset()
+ mock_makedirs.return_value = True
+ mock_write.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+
+ reset()
+ newfile = '/test.X987yS'
+ mock_write.return_value = newfile
+ mock_set_perms.return_value = False
+ mock_rename.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+
+ reset()
+ mock_rename.return_value = True
+ mock_install.return_value = False
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ reset()
+ mock_install.return_value = True
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ reset()
+ mock_set_perms.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ mock_makedirs.assert_called_with(entry, path="/")
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ reset()
+ mock_exists.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with("/")
+ self.assertFalse(mock_makedirs.called)
+ mock_write.assert_called_with(entry)
+ mock_set_perms.assert_called_with(entry, path=newfile)
+ mock_rename.assert_called_with(newfile, entry)
+ mock_install.assert_called_with(ptool, entry)
+
+ def test_diff(self):
+ ptool = get_file_object()
+ content1 = "line1\nline2"
+ content2 = "line3"
+ rv = ["line1", "line2", "line3"]
+ func = Mock()
+ func.return_value = rv
+ self.assertItemsEqual(ptool._diff(content1, content2, func), rv)
+ func.assert_called_with(["line1", "line2"], ["line3"])
+
+ func.reset_mock()
+ def slow_diff(content1, content2):
+ for i in range(1, 10):
+ time.sleep(5)
+ yield "line%s" % i
+ func.side_effect = slow_diff
+ self.assertFalse(ptool._diff(content1, content2, func), rv)
+ func.assert_called_with(["line1", "line2"], ["line3"])
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py
new file mode 100644
index 000000000..e663973c7
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestHardlink.py
@@ -0,0 +1,80 @@
+import os
+import copy
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Hardlink import *
+from Test__init import get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_hardlink_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXHardlink(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXHardlink(unittest.TestCase):
+ @patch("os.path.samefile")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ def test_verify(self, mock_verify, mock_samefile):
+ entry = lxml.etree.Element("Path", name="/test", type="hardlink",
+ to="/dest")
+ ptool = get_hardlink_object()
+
+ mock_samefile.return_value = True
+ mock_verify.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_samefile.assert_called_with(entry.get("name"),
+ entry.get("to"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_samefile.reset_mock()
+ mock_verify.reset_mock()
+ mock_verify.return_value = True
+ self.assertTrue(ptool.verify(entry, []))
+ mock_samefile.assert_called_with(entry.get("name"),
+ entry.get("to"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_samefile.reset_mock()
+ mock_verify.reset_mock()
+ mock_samefile.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_samefile.assert_called_with(entry.get("name"),
+ entry.get("to"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_samefile.reset_mock()
+ mock_verify.reset_mock()
+ mock_samefile.side_effect = OSError
+ self.assertFalse(ptool.verify(entry, []))
+ mock_samefile.assert_called_with(entry.get("name"),
+ entry.get("to"))
+
+ @patch("os.link")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ @patch("Bcfg2.Client.Tools.POSIX.Hardlink.POSIXHardlink._exists")
+ def test_install(self, mock_exists, mock_install, mock_link):
+ entry = lxml.etree.Element("Path", name="/test", type="hardlink",
+ to="/dest")
+ ptool = get_hardlink_object()
+
+ mock_exists.return_value = False
+ mock_install.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_link.assert_called_with(entry.get("to"), entry.get("name"))
+ mock_install.assert_called_with(ptool, entry)
+
+ mock_link.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_link.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_link.assert_called_with(entry.get("to"), entry.get("name"))
+ mock_install.assert_called_with(ptool, entry)
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py
new file mode 100644
index 000000000..38f3b6ee3
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestNonexistent.py
@@ -0,0 +1,88 @@
+import os
+import copy
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Nonexistent import *
+from Test__init import get_config, get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_nonexistent_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXNonexistent(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXNonexistent(unittest.TestCase):
+ @patch("os.path.lexists")
+ def test_verify(self, mock_lexists):
+ entry = lxml.etree.Element("Path", name="/test", type="nonexistent")
+ ptool = get_nonexistent_object()
+
+ for val in [True, False]:
+ mock_lexists.reset_mock()
+ mock_lexists.return_value = val
+ self.assertEqual(ptool.verify(entry, []), not val)
+ mock_lexists.assert_called_with(entry.get("name"))
+
+ @patch("os.rmdir")
+ @patch("os.remove")
+ @patch("shutil.rmtree")
+ def test_install(self, mock_rmtree, mock_remove, mock_rmdir):
+ entry = lxml.etree.Element("Path", name="/test", type="nonexistent")
+ ptool = get_nonexistent_object()
+
+ with patch("os.path.isdir") as mock_isdir:
+ def reset():
+ mock_isdir.reset_mock()
+ mock_remove.reset_mock()
+ mock_rmdir.reset_mock()
+ mock_rmtree.reset_mock()
+
+ mock_isdir.return_value = False
+ self.assertTrue(ptool.install(entry))
+ mock_remove.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_remove.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+ mock_remove.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_isdir.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_rmdir.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_rmdir.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+ mock_rmdir.assert_called_with(entry.get("name"))
+
+ reset()
+ entry.set("recursive", "true")
+ self.assertTrue(ptool.install(entry))
+ mock_rmtree.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_rmtree.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+ mock_rmtree.assert_called_with(entry.get("name"))
+
+ reset()
+ child_entry = lxml.etree.Element("Path", name="/test/foo",
+ type="nonexistent")
+ ptool = get_nonexistent_object(posix=get_posix_object(config=get_config([child_entry])))
+ mock_rmtree.side_effect = None
+ self.assertTrue(ptool.install(entry))
+ mock_rmtree.assert_called_with(entry.get("name"))
+
+ reset()
+ child_entry = lxml.etree.Element("Path", name="/test/foo",
+ type="file")
+ ptool = get_nonexistent_object(posix=get_posix_object(config=get_config([child_entry])))
+ mock_rmtree.side_effect = None
+ self.assertFalse(ptool.install(entry))
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py
new file mode 100644
index 000000000..94b74dd13
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestPermissions.py
@@ -0,0 +1,21 @@
+import os
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Permissions import *
+from Test__init import get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_permissions_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXPermissions(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXPermissions(unittest.TestCase):
+ # nothing to test!
+ pass
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py
new file mode 100644
index 000000000..a3ed9f68d
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/TestSymlink.py
@@ -0,0 +1,76 @@
+import os
+import copy
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Client.Tools.POSIX.Symlink import *
+from Test__init import get_posix_object
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_symlink_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXSymlink(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXSymlink(unittest.TestCase):
+ @patch("os.readlink")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.verify")
+ def test_verify(self, mock_verify, mock_readlink):
+ entry = lxml.etree.Element("Path", name="/test", type="symlink",
+ to="/dest")
+ ptool = get_symlink_object()
+
+ mock_readlink.return_value = entry.get("to")
+ mock_verify.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_readlink.assert_called_with(entry.get("name"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_readlink.reset_mock()
+ mock_verify.reset_mock()
+ mock_verify.return_value = True
+ self.assertTrue(ptool.verify(entry, []))
+ mock_readlink.assert_called_with(entry.get("name"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_readlink.reset_mock()
+ mock_verify.reset_mock()
+ mock_readlink.return_value = "/bogus"
+ self.assertFalse(ptool.verify(entry, []))
+ mock_readlink.assert_called_with(entry.get("name"))
+ mock_verify.assert_called_with(ptool, entry, [])
+
+ mock_readlink.reset_mock()
+ mock_verify.reset_mock()
+ mock_readlink.side_effect = OSError
+ self.assertFalse(ptool.verify(entry, []))
+ mock_readlink.assert_called_with(entry.get("name"))
+
+ @patch("os.symlink")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool.install")
+ @patch("Bcfg2.Client.Tools.POSIX.Symlink.POSIXSymlink._exists")
+ def test_install(self, mock_exists, mock_install, mock_symlink):
+ entry = lxml.etree.Element("Path", name="/test", type="symlink",
+ to="/dest")
+ ptool = get_symlink_object()
+
+ mock_exists.return_value = False
+ mock_install.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_symlink.assert_called_with(entry.get("to"), entry.get("name"))
+ mock_install.assert_called_with(ptool, entry)
+
+ mock_symlink.reset_mock()
+ mock_exists.reset_mock()
+ mock_install.reset_mock()
+ mock_symlink.side_effect = OSError
+ self.assertFalse(ptool.install(entry))
+ mock_exists.assert_called_with(entry, remove=True)
+ mock_symlink.assert_called_with(entry.get("to"), entry.get("name"))
+ mock_install.assert_called_with(ptool, entry)
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py
new file mode 100644
index 000000000..952bb02dd
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Test__init.py
@@ -0,0 +1,238 @@
+import os
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+import Bcfg2.Client.Tools
+import Bcfg2.Client.Tools.POSIX
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_config(entries):
+ config = lxml.etree.Element("Configuration")
+ bundle = lxml.etree.SubElement(config, "Bundle", name="test")
+ bundle.extend(entries)
+ return config
+
+def get_posix_object(logger=None, setup=None, config=None):
+ if config is None:
+ config = lxml.etree.Element("Configuration")
+ if not logger:
+ def print_msg(msg):
+ print(msg)
+ logger = Mock()
+ logger.error = Mock(side_effect=print_msg)
+ logger.warning = Mock(side_effect=print_msg)
+ logger.info = Mock(side_effect=print_msg)
+ logger.debug = Mock(side_effect=print_msg)
+ if not setup:
+ setup = MagicMock()
+ return Bcfg2.Client.Tools.POSIX.POSIX(logger, setup, config)
+
+
+class TestPOSIX(unittest.TestCase):
+ def test__init(self):
+ entries = [lxml.etree.Element("Path", name="test", type="file")]
+ p = get_posix_object(config=get_config(entries))
+ self.assertIsInstance(p, Bcfg2.Client.Tools.Tool)
+ self.assertIsInstance(p, Bcfg2.Client.Tools.POSIX.POSIX)
+ self.assertIn('Path', p.__req__)
+ self.assertGreater(len(p.__req__['Path']), 0)
+ self.assertGreater(len(p.__handles__), 0)
+ self.assertItemsEqual(p.handled, entries)
+
+ @patch("Bcfg2.Client.Tools.Tool.canVerify")
+ def test_canVerify(self, mock_canVerify):
+ entry = lxml.etree.Element("Path", name="test", type="file")
+ p = get_posix_object()
+
+ # first, test superclass canVerify failure
+ mock_canVerify.return_value = False
+ self.assertFalse(p.canVerify(entry))
+ mock_canVerify.assert_called_with(p, entry)
+
+ # next, test fully_specified failure
+ p.logger.error.reset_mock()
+ mock_canVerify.reset_mock()
+ mock_canVerify.return_value = True
+ mock_fully_spec = Mock()
+ mock_fully_spec.return_value = False
+ p._handlers[entry.get("type")].fully_specified = mock_fully_spec
+ self.assertFalse(p.canVerify(entry))
+ mock_canVerify.assert_called_with(p, entry)
+ mock_fully_spec.assert_called_with(entry)
+ self.assertTrue(p.logger.error.called)
+
+ # finally, test success
+ p.logger.error.reset_mock()
+ mock_canVerify.reset_mock()
+ mock_fully_spec.reset_mock()
+ mock_fully_spec.return_value = True
+ self.assertTrue(p.canVerify(entry))
+ mock_canVerify.assert_called_with(p, entry)
+ mock_fully_spec.assert_called_with(entry)
+ self.assertFalse(p.logger.error.called)
+
+ @patch("Bcfg2.Client.Tools.Tool.canInstall")
+ def test_canInstall(self, mock_canInstall):
+ entry = lxml.etree.Element("Path", name="test", type="file")
+ p = get_posix_object()
+
+ # first, test superclass canInstall failure
+ mock_canInstall.return_value = False
+ self.assertFalse(p.canInstall(entry))
+ mock_canInstall.assert_called_with(p, entry)
+
+ # next, test fully_specified failure
+ p.logger.error.reset_mock()
+ mock_canInstall.reset_mock()
+ mock_canInstall.return_value = True
+ mock_fully_spec = Mock()
+ mock_fully_spec.return_value = False
+ p._handlers[entry.get("type")].fully_specified = mock_fully_spec
+ self.assertFalse(p.canInstall(entry))
+ mock_canInstall.assert_called_with(p, entry)
+ mock_fully_spec.assert_called_with(entry)
+ self.assertTrue(p.logger.error.called)
+
+ # finally, test success
+ p.logger.error.reset_mock()
+ mock_canInstall.reset_mock()
+ mock_fully_spec.reset_mock()
+ mock_fully_spec.return_value = True
+ self.assertTrue(p.canInstall(entry))
+ mock_canInstall.assert_called_with(p, entry)
+ mock_fully_spec.assert_called_with(entry)
+ self.assertFalse(p.logger.error.called)
+
+ def test_InstallPath(self):
+ entry = lxml.etree.Element("Path", name="test", type="file")
+ p = get_posix_object()
+
+ mock_install = Mock()
+ mock_install.return_value = True
+ p._handlers[entry.get("type")].install = mock_install
+ self.assertTrue(p.InstallPath(entry))
+ mock_install.assert_called_with(entry)
+
+ def test_VerifyPath(self):
+ entry = lxml.etree.Element("Path", name="test", type="file")
+ modlist = []
+ p = get_posix_object()
+
+ mock_verify = Mock()
+ mock_verify.return_value = True
+ p._handlers[entry.get("type")].verify = mock_verify
+ self.assertTrue(p.VerifyPath(entry, modlist))
+ mock_verify.assert_called_with(entry, modlist)
+
+ mock_verify.reset_mock()
+ mock_verify.return_value = False
+ p.setup.__getitem__.return_value = True
+ self.assertFalse(p.VerifyPath(entry, modlist))
+ self.assertIsNotNone(entry.get('qtext'))
+
+ @patch('os.remove')
+ def test_prune_old_backups(self, mock_remove):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+ setup = dict(ppath='/', max_copies=5, paranoid=True)
+ posix = get_posix_object(setup=setup)
+
+ remove = ["_etc_foo_2012-07-20T04:13:22.364989",
+ "_etc_foo_2012-07-31T04:13:23.894958",
+ "_etc_foo_2012-07-17T04:13:22.493316",]
+ keep = ["_etc_foo_bar_2011-08-07T04:13:22.519978",
+ "_etc_foo_2012-08-04T04:13:22.519978",
+ "_etc_Foo_2011-08-07T04:13:22.519978",
+ "_etc_foo_2012-08-06T04:13:22.519978",
+ "_etc_foo_2012-08-03T04:13:22.191895",
+ "_etc_test_2011-08-07T04:13:22.519978",
+ "_etc_foo_2012-08-07T04:13:22.519978",]
+
+ with patch('os.listdir') as mock_listdir:
+ mock_listdir.side_effect = OSError
+ posix._prune_old_backups(entry)
+ self.assertTrue(posix.logger.error.called)
+ self.assertFalse(mock_remove.called)
+ mock_listdir.assert_called_with(setup['ppath'])
+
+ mock_listdir.reset_mock()
+ mock_remove.reset_mock()
+ mock_listdir.side_effect = None
+ mock_listdir.return_value = keep + remove
+
+ posix._prune_old_backups(entry)
+ mock_listdir.assert_called_with(setup['ppath'])
+ self.assertItemsEqual(mock_remove.call_args_list,
+ [call(os.path.join(setup['ppath'], p))
+ for p in remove])
+
+ mock_listdir.reset_mock()
+ mock_remove.reset_mock()
+ mock_remove.side_effect = OSError
+ posix.logger.error.reset_mock()
+ # test to ensure that we call os.remove() for all files that
+ # need to be removed even if we get an error
+ posix._prune_old_backups(entry)
+ mock_listdir.assert_called_with(setup['ppath'])
+ self.assertItemsEqual(mock_remove.call_args_list,
+ [call(os.path.join(setup['ppath'], p))
+ for p in remove])
+ self.assertTrue(posix.logger.error.called)
+
+ @patch("shutil.copy")
+ @patch("Bcfg2.Client.Tools.POSIX.POSIX._prune_old_backups")
+ def test_paranoid_backup(self, mock_prune, mock_copy):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+ setup = dict(ppath='/', max_copies=5, paranoid=False)
+ posix = get_posix_object(setup=setup)
+
+ # paranoid false globally
+ posix._paranoid_backup(entry)
+ self.assertFalse(mock_prune.called)
+ self.assertFalse(mock_copy.called)
+
+ # paranoid false on the entry
+ mock_prune.reset_mock()
+ setup['paranoid'] = True
+ posix = get_posix_object(setup=setup)
+ posix._paranoid_backup(entry)
+ self.assertFalse(mock_prune.called)
+ self.assertFalse(mock_copy.called)
+
+ # entry does not exist on filesystem
+ mock_prune.reset_mock()
+ entry.set("paranoid", "true")
+ entry.set("current_exists", "false")
+ posix._paranoid_backup(entry)
+ self.assertFalse(mock_prune.called)
+ self.assertFalse(mock_copy.called)
+
+ with patch("os.path.isdir") as mock_isdir:
+ # entry is a directory on the filesystem
+ mock_prune.reset_mock()
+ entry.set("current_exists", "true")
+ mock_isdir.return_value = True
+ posix._paranoid_backup(entry)
+ self.assertFalse(mock_prune.called)
+ self.assertFalse(mock_copy.called)
+ mock_isdir.assert_called_with(entry.get("name"))
+
+ # test the actual backup now
+ mock_prune.reset_mock()
+ mock_isdir.return_value = False
+ posix._paranoid_backup(entry)
+ mock_isdir.assert_called_with(entry.get("name"))
+ mock_prune.assert_called_with(entry)
+ # it's basically impossible to test the shutil.copy() call
+ # exactly because the destination includes microseconds,
+ # so we just test it good enough
+ self.assertEqual(mock_copy.call_args[0][0],
+ entry.get("name"))
+ bkupnam = os.path.join(setup['ppath'],
+ entry.get('name').replace('/', '_')) + '_'
+ self.assertEqual(bkupnam,
+ mock_copy.call_args[0][1][:len(bkupnam)])
diff --git a/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py
new file mode 100644
index 000000000..68007ca8e
--- /dev/null
+++ b/testsuite/Testlib/TestClient/TestTools/TestPOSIX/Testbase.py
@@ -0,0 +1,966 @@
+import os
+import copy
+import stat
+import unittest
+import lxml.etree
+from mock import Mock, MagicMock, patch
+import Bcfg2.Client.Tools
+from Bcfg2.Client.Tools.POSIX.base import *
+from Test__init import get_posix_object
+
+try:
+ import selinux
+ has_selinux = True
+except ImportError:
+ has_selinux = False
+
+try:
+ import posix1e
+ has_acls = True
+except ImportError:
+ has_acls = False
+
+def call(*args, **kwargs):
+ """ the Mock call object is a fairly recent addition, but it's
+ very very useful, so we create our own function to create Mock
+ calls """
+ return (args, kwargs)
+
+def get_posixtool_object(posix=None):
+ if posix is None:
+ posix = get_posix_object()
+ return POSIXTool(posix.logger, posix.setup, posix.config)
+
+class TestPOSIXTool(unittest.TestCase):
+ def test_fully_specified(self):
+ # fully_specified should do no checking on the abstract
+ # POSIXTool object
+ ptool = get_posixtool_object()
+ self.assertTrue(ptool.fully_specified(Mock()))
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._verify_metadata")
+ def test_verify(self, mock_verify):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ ptool = get_posixtool_object()
+ with patch('os.stat') as mock_stat, patch('os.walk') as mock_walk:
+ mock_stat.return_value = MagicMock()
+
+ mock_verify.return_value = False
+ self.assertFalse(ptool.verify(entry, []))
+ mock_verify.assert_called_with(entry)
+
+ mock_verify.reset_mock()
+ mock_verify.return_value = True
+ self.assertTrue(ptool.verify(entry, []))
+ mock_verify.assert_called_with(entry)
+
+ mock_verify.reset_mock()
+ entry.set("recursive", "true")
+ walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]),
+ ("/dir1", ["dir3"], []),
+ ("/dir2", [], ["file3", "file4"])]
+ mock_walk.return_value = walk_rv
+ self.assertTrue(ptool.verify(entry, []))
+ mock_walk.assert_called_with(entry.get("name"))
+ all_verifies = [call(entry)]
+ for root, dirs, files in walk_rv:
+ all_verifies.extend([call(entry, path=os.path.join(root, p))
+ for p in dirs + files])
+ self.assertItemsEqual(mock_verify.call_args_list, all_verifies)
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._set_perms")
+ def test_install(self, mock_set_perms):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ ptool = get_posixtool_object()
+
+ mock_set_perms.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_set_perms.assert_called_with(entry)
+
+ mock_set_perms.reset_mock()
+ entry.set("recursive", "true")
+ with patch('os.walk') as mock_walk:
+ walk_rv = [("/", ["dir1", "dir2"], ["file1", "file2"]),
+ ("/dir1", ["dir3"], []),
+ ("/dir2", [], ["file3", "file4"])]
+ mock_walk.return_value = walk_rv
+
+ mock_set_perms.return_value = True
+ self.assertTrue(ptool.install(entry))
+ mock_walk.assert_called_with(entry.get("name"))
+ all_set_perms = [call(entry)]
+ for root, dirs, files in walk_rv:
+ all_set_perms.extend([call(entry,
+ path=os.path.join(root, p))
+ for p in dirs + files])
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ all_set_perms)
+
+ mock_walk.reset_mock()
+ mock_set_perms.reset_mock()
+
+ def set_perms_rv(entry, path=None):
+ if path == '/dir2/file3':
+ return False
+ else:
+ return True
+ mock_set_perms.side_effect = set_perms_rv
+
+ self.assertFalse(ptool.install(entry))
+ mock_walk.assert_called_with(entry.get("name"))
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ all_set_perms)
+
+ @patch("os.unlink")
+ @patch("shutil.rmtree")
+ def test_exists(self, mock_rmtree, mock_unlink):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+ ptool = get_posixtool_object()
+ with patch('os.lstat') as mock_lstat, \
+ patch("os.path.isdir") as mock_isdir:
+ mock_lstat.side_effect = OSError
+ self.assertFalse(ptool._exists(entry))
+ mock_lstat.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ mock_lstat.reset_mock()
+ mock_unlink.reset_mock()
+ rv = MagicMock()
+ mock_lstat.return_value = rv
+ mock_lstat.side_effect = None
+ self.assertEqual(ptool._exists(entry), rv)
+ mock_lstat.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ mock_lstat.reset_mock()
+ mock_unlink.reset_mock()
+ mock_isdir.return_value = False
+ self.assertFalse(ptool._exists(entry, remove=True))
+ mock_isdir.assert_called_with(entry.get('name'))
+ mock_lstat.assert_called_with(entry.get('name'))
+ mock_unlink.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_rmtree.called)
+
+ mock_lstat.reset_mock()
+ mock_isdir.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rmtree.reset_mock()
+ mock_isdir.return_value = True
+ self.assertFalse(ptool._exists(entry, remove=True))
+ mock_isdir.assert_called_with(entry.get('name'))
+ mock_lstat.assert_called_with(entry.get('name'))
+ mock_rmtree.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ mock_isdir.reset_mock()
+ mock_lstat.reset_mock()
+ mock_unlink.reset_mock()
+ mock_rmtree.reset_mock()
+ mock_rmtree.side_effect = OSError
+ self.assertEqual(ptool._exists(entry, remove=True), rv)
+ mock_isdir.assert_called_with(entry.get('name'))
+ mock_lstat.assert_called_with(entry.get('name'))
+ mock_rmtree.assert_called_with(entry.get('name'))
+ self.assertFalse(mock_unlink.called)
+
+ @patch("os.chown")
+ @patch("os.chmod")
+ @patch("os.utime")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_entry_uid")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_entry_gid")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._set_acls")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._set_secontext")
+ def test_set_perms(self, mock_set_secontext, mock_set_acls, mock_norm_gid,
+ mock_norm_uid, mock_utime, mock_chmod, mock_chown):
+ ptool = get_posixtool_object()
+
+ def reset():
+ mock_set_secontext.reset_mock()
+ mock_set_acls.reset_mock()
+ mock_norm_gid.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_chmod.reset_mock()
+ mock_chown.reset_mock()
+ mock_utime.reset_mock()
+
+ entry = lxml.etree.Element("Path", name="/etc/foo", to="/etc/bar",
+ type="symlink")
+ mock_set_acls.return_value = True
+ mock_set_secontext.return_value = True
+ self.assertTrue(ptool._set_perms(entry))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ entry = lxml.etree.Element("Path", name="/etc/foo", owner="owner",
+ group="group", perms="644", type="file")
+ mock_norm_uid.return_value = 10
+ mock_norm_gid.return_value = 100
+
+ reset()
+ self.assertTrue(ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ self.assertFalse(mock_utime.called)
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ reset()
+ mtime = 1344459042
+ entry.set("mtime", str(mtime))
+ self.assertTrue(ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ reset()
+ self.assertTrue(ptool._set_perms(entry, path='/etc/bar'))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with('/etc/bar', 10, 100)
+ mock_chmod.assert_called_with('/etc/bar', int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path='/etc/bar')
+ mock_set_acls.assert_called_with(entry, path='/etc/bar')
+
+ # test dev_type modification of perms, failure of chown
+ reset()
+ def chown_rv(path, owner, group):
+ if owner == 0 and group == 0:
+ return True
+ else:
+ raise KeyError
+ os.chown.side_effect = chown_rv
+ entry.set("type", "device")
+ entry.set("dev_type", device_map.keys()[0])
+ self.assertFalse(ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 0, 0)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8) | device_map.values()[0])
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ # test failure of chmod
+ reset()
+ os.chown.side_effect = None
+ os.chmod.side_effect = OSError
+ entry.set("type", "file")
+ del entry.attrib["dev_type"]
+ self.assertFalse(ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ # test that even when everything fails, we try to do it all.
+ # e.g., when chmod fails, we still try to apply acls, set
+ # selinux context, etc.
+ reset()
+ os.chown.side_effect = OSError
+ os.utime.side_effect = OSError
+ mock_set_acls.return_value = False
+ mock_set_secontext.return_value = False
+ self.assertFalse(ptool._set_perms(entry))
+ mock_norm_uid.assert_called_with(entry)
+ mock_norm_gid.assert_called_with(entry)
+ mock_chown.assert_called_with(entry.get("name"), 10, 100)
+ mock_chmod.assert_called_with(entry.get("name"),
+ int(entry.get("perms"), 8))
+ mock_utime.assert_called_with(entry.get("name"), (mtime, mtime))
+ mock_set_secontext.assert_called_with(entry, path=entry.get("name"))
+ mock_set_acls.assert_called_with(entry, path=entry.get("name"))
+
+ @unittest.skipUnless(has_acls, "ACLS not found, skipping")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_uid")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_gid")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._list_entry_acls")
+ def test_set_acls(self, mock_list_entry_acls, mock_norm_gid, mock_norm_uid):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+ ptool = get_posixtool_object()
+
+ # disable acls for the initial test
+ Bcfg2.Client.Tools.POSIX.base.has_acls = False
+ self.assertTrue(ptool._set_acls(entry))
+ Bcfg2.Client.Tools.POSIX.base.has_acls = True
+
+ # build a set of file ACLs to return from posix1e.ACL(file=...)
+ file_acls = []
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_USER
+ acl.name = "remove"
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_GROUP
+ acl.name = "remove"
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_MASK
+ acl.name = "keep"
+ file_acls.append(acl)
+ remove_acls = [a for a in file_acls if a.name == "remove"]
+
+ # build a set of ACLs listed on the entry as returned by
+ # _list_entry_acls()
+ entry_acls = {("default", posix1e.ACL_USER, "user"): 7,
+ ("access", posix1e.ACL_GROUP, "group"): 5}
+ mock_list_entry_acls.return_value = entry_acls
+ mock_norm_uid.return_value = 10
+ mock_norm_gid.return_value = 100
+
+ with patch("posix1e.ACL") as mock_ACL, \
+ patch("posix1e.Entry") as mock_Entry, \
+ patch("os.path.isdir") as mock_isdir:
+ # set up the unreasonably complex return value for
+ # posix1e.ACL(), which has three separate uses
+ fileacl_rv = MagicMock()
+ fileacl_rv.valid.return_value = True
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv = MagicMock()
+ filedef_rv.valid.return_value = True
+ filedef_rv.__iter__.return_value = iter(file_acls)
+ acl_rv = MagicMock()
+ def mock_acl_rv(file=None, filedef=None, acl=None):
+ if file:
+ return fileacl_rv
+ elif filedef:
+ return filedef_rv
+ elif acl:
+ return acl_rv
+
+ # set up the equally unreasonably complex return value for
+ # posix1e.Entry, which returns a new entry and adds it to
+ # an ACL, so we have to track the Mock objects it returns.
+ # why can't they just have an acl.add_entry() method?!?
+ acl_entries = []
+ def mock_entry_rv(acl):
+ rv = MagicMock()
+ rv.acl = acl
+ rv.permset = set()
+ acl_entries.append(rv)
+ return rv
+ mock_Entry.side_effect = mock_entry_rv
+
+ def reset():
+ mock_isdir.reset_mock()
+ mock_ACL.reset_mock()
+ mock_Entry.reset_mock()
+ fileacl_rv.reset_mock()
+
+ # test fs mounted noacl
+ mock_ACL.side_effect = IOError(95, "Operation not permitted")
+ self.assertFalse(ptool._set_acls(entry))
+
+ # test other error
+ reset()
+ mock_ACL.side_effect = IOError
+ self.assertFalse(ptool._set_acls(entry))
+
+ reset()
+ mock_ACL.side_effect = mock_acl_rv
+ mock_isdir.return_value = True
+ self.assertTrue(ptool._set_acls(entry))
+ self.assertItemsEqual(mock_ACL.call_args_list,
+ [call(file=entry.get("name")),
+ call(filedef=entry.get("name"))])
+ self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list,
+ [call(a) for a in remove_acls])
+ self.assertItemsEqual(filedef_rv.delete_entry.call_args_list,
+ [call(a) for a in remove_acls])
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_norm_uid.assert_called_with("user")
+ mock_norm_gid.assert_called_with("group")
+ fileacl_rv.calc_mask.assert_any_call()
+ fileacl_rv.applyto.assert_called_with(entry.get("name"),
+ posix1e.ACL_TYPE_ACCESS)
+ filedef_rv.calc_mask.assert_any_call()
+ filedef_rv.applyto.assert_called_with(entry.get("name"),
+ posix1e.ACL_TYPE_DEFAULT)
+
+ # build tuples of the Entry objects that were added to acl
+ # and defaacl so they're easier to compare for equality
+ added_acls = []
+ for acl in acl_entries:
+ added_acls.append((acl.acl, acl.tag_type, acl.qualifier,
+ sum(acl.permset)))
+ self.assertItemsEqual(added_acls,
+ [(filedef_rv, posix1e.ACL_USER, 10, 7),
+ (fileacl_rv, posix1e.ACL_GROUP, 100, 5)])
+
+ reset()
+ # have to reassign these because they're iterators, and
+ # they've already been iterated over once
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv.__iter__.return_value = iter(file_acls)
+ mock_list_entry_acls.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ mock_isdir.return_value = False
+ acl_entries = []
+ self.assertTrue(ptool._set_acls(entry, path="/bin/bar"))
+ mock_ACL.assert_called_with(file="/bin/bar")
+ self.assertItemsEqual(fileacl_rv.delete_entry.call_args_list,
+ [call(a) for a in remove_acls])
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_norm_gid.assert_called_with("group")
+ fileacl_rv.calc_mask.assert_any_call()
+ fileacl_rv.applyto.assert_called_with("/bin/bar",
+ posix1e.ACL_TYPE_ACCESS)
+
+ added_acls = []
+ for acl in acl_entries:
+ added_acls.append((acl.acl, acl.tag_type, acl.qualifier,
+ sum(acl.permset)))
+ self.assertItemsEqual(added_acls,
+ [(fileacl_rv, posix1e.ACL_GROUP, 100, 5)])
+
+ @unittest.skipUnless(has_selinux, "SELinux not found, skipping")
+ def test_set_secontext(self):
+ entry = lxml.etree.Element("Path", name="/etc/foo", type="file")
+ ptool = get_posixtool_object()
+
+ # disable selinux for the initial test
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ self.assertTrue(ptool._set_secontext(entry))
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = True
+
+ with patch("selinux.restorecon") as mock_restorecon, \
+ patch("selinux.lsetfilecon") as mock_lsetfilecon:
+ # no context given
+ self.assertTrue(ptool._set_secontext(entry))
+ self.assertFalse(mock_restorecon.called)
+ self.assertFalse(mock_lsetfilecon.called)
+
+ mock_restorecon.reset_mock()
+ mock_lsetfilecon.reset_mock()
+ entry.set("secontext", "__default__")
+ self.assertTrue(ptool._set_secontext(entry))
+ mock_restorecon.assert_called_with(entry.get("name"))
+ self.assertFalse(mock_lsetfilecon.called)
+
+ mock_restorecon.reset_mock()
+ mock_lsetfilecon.reset_mock()
+ mock_lsetfilecon.return_value = 0
+ entry.set("secontext", "foo_t")
+ self.assertTrue(ptool._set_secontext(entry))
+ self.assertFalse(mock_restorecon.called)
+ mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t")
+
+ mock_restorecon.reset_mock()
+ mock_lsetfilecon.reset_mock()
+ mock_lsetfilecon.return_value = 1
+ self.assertFalse(ptool._set_secontext(entry))
+ self.assertFalse(mock_restorecon.called)
+ mock_lsetfilecon.assert_called_with(entry.get("name"), "foo_t")
+
+ @patch("grp.getgrnam")
+ def test_norm_gid(self, mock_getgrnam):
+ ptool = get_posixtool_object()
+ self.assertEqual(5, ptool._norm_gid("5"))
+ self.assertFalse(mock_getgrnam.called)
+
+ mock_getgrnam.reset_mock()
+ mock_getgrnam.return_value = ("group", "x", 5, [])
+ self.assertEqual(5, ptool._norm_gid("group"))
+ mock_getgrnam.assert_called_with("group")
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_gid")
+ def test_norm_entry_gid(self, mock_norm_gid):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ group="group", owner="user")
+ ptool = get_posixtool_object()
+ mock_norm_gid.return_value = 10
+ self.assertEqual(10, ptool._norm_entry_gid(entry))
+ mock_norm_gid.assert_called_with(entry.get("group"))
+
+ mock_norm_gid.reset_mock()
+ mock_norm_gid.side_effect = KeyError
+ self.assertEqual(0, ptool._norm_entry_gid(entry))
+ mock_norm_gid.assert_called_with(entry.get("group"))
+
+ @patch("pwd.getpwnam")
+ def test_norm_uid(self, mock_getpwnam):
+ ptool = get_posixtool_object()
+ self.assertEqual(5, ptool._norm_uid("5"))
+ self.assertFalse(mock_getpwnam.called)
+
+ mock_getpwnam.reset_mock()
+ mock_getpwnam.return_value = ("user", "x", 5, 5, "User", "/home/user",
+ "/bin/zsh")
+ self.assertEqual(5, ptool._norm_uid("user"))
+ mock_getpwnam.assert_called_with("user")
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_uid")
+ def test_norm_entry_uid(self, mock_norm_uid):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ group="group", owner="user")
+ ptool = get_posixtool_object()
+ mock_norm_uid.return_value = 10
+ self.assertEqual(10, ptool._norm_entry_uid(entry))
+ mock_norm_uid.assert_called_with(entry.get("owner"))
+
+ mock_norm_uid.reset_mock()
+ mock_norm_uid.side_effect = KeyError
+ self.assertEqual(0, ptool._norm_entry_uid(entry))
+ mock_norm_uid.assert_called_with(entry.get("owner"))
+
+ def test_norm_acl_perms(self):
+ ptool = get_posixtool_object()
+ # there's basically no reasonably way to test the Permset
+ # object parsing feature without writing our own Mock object
+ # that re-implements Permset.test(). silly pylibacl won't let
+ # us create standalone Entry or Permset objects.
+ ptool = get_posixtool_object()
+ self.assertEqual(5, ptool._norm_acl_perms("5"))
+ self.assertEqual(0, ptool._norm_acl_perms("55"))
+ self.assertEqual(5, ptool._norm_acl_perms("rx"))
+ self.assertEqual(5, ptool._norm_acl_perms("r-x"))
+ self.assertEqual(6, ptool._norm_acl_perms("wr-"))
+ self.assertEqual(0, ptool._norm_acl_perms("rwrw"))
+ self.assertEqual(0, ptool._norm_acl_perms("-"))
+ self.assertEqual(0, ptool._norm_acl_perms("a"))
+ self.assertEqual(6, ptool._norm_acl_perms("rwa"))
+ self.assertEqual(4, ptool._norm_acl_perms("rr"))
+
+ def test__gather_data(self):
+ path = '/test'
+ ptool = get_posixtool_object()
+
+ # have to use context manager version of patch here because
+ # os.stat must be unpatched when we instantiate the object to
+ # make pkgutil.walk_packages() work
+ with patch('os.stat') as mock_stat:
+ mock_stat.side_effect = OSError
+ self.assertFalse(ptool._gather_data(path)[0])
+ mock_stat.assert_called_with(path)
+
+ mock_stat.reset_mock()
+ mock_stat.side_effect = None
+ # create a return value
+ stat_rv = MagicMock()
+ def stat_getitem(key):
+ if int(key) == stat.ST_UID:
+ return 0
+ elif int(key) == stat.ST_GID:
+ return 10
+ elif int(key) == stat.ST_MODE:
+ # return extra bits in the mode to emulate a device
+ # and ensure that they're stripped
+ return int('060660', 8)
+ stat_rv.__getitem__ = Mock(side_effect=stat_getitem)
+ mock_stat.return_value = stat_rv
+
+ # disable selinux and acls for this call -- we test them
+ # separately so that we can skip those tests as appropriate
+ states = (Bcfg2.Client.Tools.POSIX.base.has_selinux,
+ Bcfg2.Client.Tools.POSIX.base.has_acls)
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ Bcfg2.Client.Tools.POSIX.base.has_acls = False
+ self.assertEqual(ptool._gather_data(path),
+ (stat_rv, '0', '10', '0660', None, None))
+ Bcfg2.Client.Tools.POSIX.base.has_selinux, \
+ Bcfg2.Client.Tools.POSIX.base.has_acls = states
+ mock_stat.assert_called_with(path)
+
+ @unittest.skipUnless(has_selinux, "SELinux not found, skipping")
+ def test__gather_data_selinux(self):
+ context = 'system_u:object_r:root_t:s0'
+ path = '/test'
+ ptool = get_posixtool_object()
+ with patch("selinux.getfilecon") as mock_getfilecon, \
+ patch('os.stat') as mock_stat:
+ mock_getfilecon.return_value = [len(context) + 1, context]
+ mock_stat.return_value = MagicMock()
+ # disable acls for this call and test them separately
+ state = Bcfg2.Client.Tools.POSIX.base.has_acls
+ Bcfg2.Client.Tools.POSIX.base.has_acls = False
+ self.assertEqual(ptool._gather_data(path)[4], 'root_t')
+ Bcfg2.Client.Tools.POSIX.base.has_acls = state
+ mock_getfilecon.assert_called_with(path)
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._list_file_acls")
+ @unittest.skipUnless(has_acls, "ACLS not found, skipping")
+ def test__gather_data_acls(self, mock_list_file_acls):
+ acls = {("default", posix1e.ACL_USER, "testuser"): "rwx",
+ ("access", posix1e.ACL_GROUP, "testgroup"): "rx"}
+ mock_list_file_acls.return_value = acls
+ path = '/test'
+ ptool = get_posixtool_object()
+ with patch('os.stat') as mock_stat:
+ mock_stat.return_value = MagicMock()
+ # disable selinux for this call and test it separately
+ state = Bcfg2.Client.Tools.POSIX.base.has_selinux
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = False
+ self.assertItemsEqual(ptool._gather_data(path)[5], acls)
+ Bcfg2.Client.Tools.POSIX.base.has_selinux = state
+ mock_list_file_acls.assert_called_with(path)
+
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._verify_acls")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._gather_data")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_entry_uid")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._norm_entry_gid")
+ def test_verify_metadata(self, mock_norm_gid, mock_norm_uid,
+ mock_gather_data, mock_verify_acls):
+ entry = lxml.etree.Element("Path", name="/test", type="file",
+ group="group", owner="user", perms="664",
+ secontext='etc_t')
+ # _verify_metadata() mutates the entry, so we keep a backup so we
+ # can start fresh every time
+ orig_entry = copy.deepcopy(entry)
+
+ ptool = get_posixtool_object()
+
+ mock_gather_data.return_value = (False, None, None, None, None, None)
+ self.assertFalse(ptool._verify_metadata(entry))
+ self.assertEqual(entry.get("current_exists", "").lower(), "false")
+ mock_gather_data.assert_called_with(entry.get("name"))
+
+ # expected data. tuple of attr, return value index, value
+ expected = [('current_owner', 1, '0'),
+ ('current_group', 2, '10'),
+ ('current_perms', 3, '0664'),
+ ('current_secontext', 4, 'etc_t')]
+ mock_norm_uid.return_value = 0
+ mock_norm_gid.return_value = 10
+ gather_data_rv = [MagicMock(), None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+
+ entry = copy.deepcopy(orig_entry)
+ mock_gather_data.reset_mock()
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ self.assertTrue(ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+
+ mtime = 1344430414
+ entry = copy.deepcopy(orig_entry)
+ entry.set("mtime", str(mtime))
+ mock_gather_data.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ stat_rv = MagicMock()
+ stat_rv.__getitem__.return_value = mtime
+ gather_data_rv[0] = stat_rv
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertTrue(ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ # failure modes for each checked datum. tuple of changed attr,
+ # return value index, new (failing) value
+ failures = [('current_owner', 1, '10'),
+ ('current_group', 2, '100'),
+ ('current_perms', 3, '0660')]
+ if has_selinux:
+ failures.append(('current_secontext', 4, 'root_t'))
+
+ for fail_attr, fail_idx, fail_val in failures:
+ entry = copy.deepcopy(orig_entry)
+ entry.set("mtime", str(mtime))
+ mock_gather_data.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ gather_data_rv = [stat_rv, None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ gather_data_rv[fail_idx] = fail_val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertFalse(ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ self.assertEqual(entry.get(fail_attr), fail_val)
+ for attr, idx, val in expected:
+ if attr != fail_attr:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ # failure mode for mtime
+ fail_mtime = 1344431162
+ entry = copy.deepcopy(orig_entry)
+ entry.set("mtime", str(mtime))
+ mock_gather_data.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ fail_stat_rv = MagicMock()
+ fail_stat_rv.__getitem__.return_value = fail_mtime
+ gather_data_rv = [fail_stat_rv, None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertFalse(ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry, path=entry.get("name"))
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(fail_mtime))
+
+ if has_selinux:
+ # test success and failure for __default__ secontext
+ entry = copy.deepcopy(orig_entry)
+ entry.set("mtime", str(mtime))
+ entry.set("secontext", "__default__")
+ mock_gather_data.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ with patch("selinux.matchpathcon") as mock_matchpathcon:
+ context1 = "system_u:object_r:etc_t:s0"
+ context2 = "system_u:object_r:root_t:s0"
+ mock_matchpathcon.return_value = [1 + len(context1),
+ context1]
+ gather_data_rv = [stat_rv, None, None, None, None, []]
+ for attr, idx, val in expected:
+ gather_data_rv[idx] = val
+ mock_gather_data.return_value = tuple(gather_data_rv)
+ self.assertTrue(ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry,
+ path=entry.get("name"))
+ mock_matchpathcon.assert_called_with(entry.get("name"), 0)
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ entry = copy.deepcopy(orig_entry)
+ entry.set("mtime", str(mtime))
+ entry.set("secontext", "__default__")
+ mock_gather_data.reset_mock()
+ mock_norm_uid.reset_mock()
+ mock_norm_gid.reset_mock()
+ mock_matchpathcon.return_value = [1 + len(context2),
+ context2]
+ self.assertFalse(ptool._verify_metadata(entry))
+ mock_gather_data.assert_called_with(entry.get("name"))
+ mock_verify_acls.assert_called_with(entry,
+ path=entry.get("name"))
+ mock_matchpathcon.assert_called_with(entry.get("name"), 0)
+ self.assertEqual(entry.get("current_exists", 'true'), 'true')
+ for attr, idx, val in expected:
+ self.assertEqual(entry.get(attr), val)
+ self.assertEqual(entry.get("current_mtime"), str(mtime))
+
+ @unittest.skipUnless(has_acls, "ACLS not found, skipping")
+ def test_list_entry_acls(self):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ lxml.etree.SubElement(entry, "ACL", scope="user", type="default",
+ user="user", perms="rwx")
+ lxml.etree.SubElement(entry, "ACL", scope="group", type="access",
+ group="group", perms="5")
+ ptool = get_posixtool_object()
+ self.assertItemsEqual(ptool._list_entry_acls(entry),
+ {("default", posix1e.ACL_USER, "user"): 7,
+ ("access", posix1e.ACL_GROUP, "group"): 5})
+
+ @unittest.skipUnless(has_acls, "ACLS not found, skipping")
+ @patch("pwd.getpwuid")
+ @patch("grp.getgrgid")
+ def test_list_file_acls(self, mock_getgrgid, mock_getpwuid):
+ path = '/test'
+ ptool = get_posixtool_object()
+ with patch("posix1e.ACL") as mock_ACL, \
+ patch("os.path.isdir") as mock_isdir:
+ # build a set of file ACLs to return from posix1e.ACL(file=...)
+ file_acls = []
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_USER
+ acl.qualifier = 10
+ # yes, this is a bogus permset. thanks to _norm_acl_perms
+ # it works and is easier than many of the alternatives.
+ acl.permset = 'rwx'
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_GROUP
+ acl.qualifier = 100
+ acl.permset = 'rx'
+ file_acls.append(acl)
+ acl = Mock()
+ acl.tag_type = posix1e.ACL_MASK
+ file_acls.append(acl)
+ acls = {("access", posix1e.ACL_USER, "user"): 7,
+ ("access", posix1e.ACL_GROUP, "group"): 5}
+
+ # set up the unreasonably complex return value for
+ # posix1e.ACL(), which has two separate uses
+ fileacl_rv = MagicMock()
+ fileacl_rv.valid.return_value = True
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv = MagicMock()
+ filedef_rv.valid.return_value = True
+ filedef_rv.__iter__.return_value = iter(file_acls)
+ def mock_acl_rv(file=None, filedef=None):
+ if file:
+ return fileacl_rv
+ elif filedef:
+ return filedef_rv
+ # other return values
+ mock_isdir.return_value = False
+ mock_getgrgid.return_value = ("group", "x", 5, [])
+ mock_getpwuid.return_value = ("user", "x", 5, 5, "User",
+ "/home/user", "/bin/zsh")
+
+ def reset():
+ mock_isdir.reset_mock()
+ mock_getgrgid.reset_mock()
+ mock_getpwuid.reset_mock()
+ mock_ACL.reset_mock()
+
+ mock_ACL.side_effect = IOError(95, "Operation not supported")
+ self.assertItemsEqual(ptool._list_file_acls(path), dict())
+
+ reset()
+ mock_ACL.side_effect = IOError
+ self.assertItemsEqual(ptool._list_file_acls(path), dict())
+
+ reset()
+ mock_ACL.side_effect = mock_acl_rv
+ self.assertItemsEqual(ptool._list_file_acls(path), acls)
+ mock_isdir.assert_called_with(path)
+ mock_getgrgid.assert_called_with(100)
+ mock_getpwuid.assert_called_with(10)
+ mock_ACL.assert_called_with(file=path)
+
+ reset()
+ mock_isdir.return_value = True
+ fileacl_rv.__iter__.return_value = iter(file_acls)
+ filedef_rv.__iter__.return_value = iter(file_acls)
+
+ defacls = acls
+ for akey, perms in acls.items():
+ defacls[('default', akey[1], akey[2])] = perms
+ self.assertItemsEqual(ptool._list_file_acls(path), defacls)
+ mock_isdir.assert_called_with(path)
+ self.assertItemsEqual(mock_getgrgid.call_args_list,
+ [call(100), call(100)])
+ self.assertItemsEqual(mock_getpwuid.call_args_list,
+ [call(10), call(10)])
+ self.assertItemsEqual(mock_ACL.call_args_list,
+ [call(file=path), call(filedef=path)])
+
+ @unittest.skipUnless(has_acls, "ACLS not found, skipping")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._list_file_acls")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._list_entry_acls")
+ def test_verify_acls(self, mock_list_entry_acls, mock_list_file_acls):
+ entry = lxml.etree.Element("Path", name="/test", type="file")
+ ptool = get_posixtool_object()
+ # we can't test to make sure that errors get properly sorted
+ # into (missing, extra, wrong) without refactoring the
+ # _verify_acls code, and I don't feel like doing that, so eff
+ # it. let's just test to make sure that failures are
+ # identified at all for now.
+
+ acls = {("access", posix1e.ACL_USER, "user"): 7,
+ ("default", posix1e.ACL_GROUP, "group"): 5}
+ extra_acls = copy.deepcopy(acls)
+ extra_acls[("access", posix1e.ACL_USER, "user2")] = 4
+
+ mock_list_entry_acls.return_value = acls
+ mock_list_file_acls.return_value = acls
+ self.assertTrue(ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ # test missing
+ mock_list_entry_acls.reset_mock()
+ mock_list_file_acls.reset_mock()
+ mock_list_file_acls.return_value = extra_acls
+ self.assertFalse(ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ # test extra
+ mock_list_entry_acls.reset_mock()
+ mock_list_file_acls.reset_mock()
+ mock_list_entry_acls.return_value = extra_acls
+ mock_list_file_acls.return_value = acls
+ self.assertFalse(ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ # test wrong
+ wrong_acls = copy.deepcopy(extra_acls)
+ wrong_acls[("access", posix1e.ACL_USER, "user2")] = 5
+ mock_list_entry_acls.reset_mock()
+ mock_list_file_acls.reset_mock()
+ mock_list_entry_acls.return_value = extra_acls
+ mock_list_file_acls.return_value = wrong_acls
+ self.assertFalse(ptool._verify_acls(entry))
+ mock_list_entry_acls.assert_called_with(entry)
+ mock_list_file_acls.assert_called_with(entry.get("name"))
+
+ @patch("os.makedirs")
+ @patch("os.path.exists")
+ @patch("Bcfg2.Client.Tools.POSIX.base.POSIXTool._set_perms")
+ def test_makedirs(self, mock_set_perms, mock_exists, mock_makedirs):
+ entry = lxml.etree.Element("Path", name="/test/foo/bar",
+ type="directory")
+
+ def reset():
+ mock_exists.reset_mock()
+ mock_set_perms.reset_mock()
+ mock_makedirs.reset_mock()
+
+ ptool = get_posixtool_object()
+ mock_set_perms.return_value = True
+ def path_exists_rv(path):
+ if path == "/test":
+ return True
+ else:
+ return False
+ mock_exists.side_effect = path_exists_rv
+ self.assertTrue(ptool._makedirs(entry))
+ self.assertItemsEqual(mock_exists.call_args_list,
+ [call("/test"), call("/test/foo"),
+ call("/test/foo/bar")])
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ [call(entry, path="/test/foo"),
+ call(entry, path="/test/foo/bar")])
+ mock_makedirs.assert_called_with(entry.get("name"))
+
+ reset()
+ mock_makedirs.side_effect = OSError
+ self.assertFalse(ptool._makedirs(entry))
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ [call(entry, path="/test/foo"),
+ call(entry, path="/test/foo/bar")])
+
+ reset()
+ mock_makedirs.side_effect = None
+ def set_perms_rv(entry, path=None):
+ if path == '/test/foo':
+ return False
+ else:
+ return True
+ mock_set_perms.side_effect = set_perms_rv
+ self.assertFalse(ptool._makedirs(entry))
+ self.assertItemsEqual(mock_exists.call_args_list,
+ [call("/test"), call("/test/foo"),
+ call("/test/foo/bar")])
+ self.assertItemsEqual(mock_set_perms.call_args_list,
+ [call(entry, path="/test/foo"),
+ call(entry, path="/test/foo/bar")])
+ mock_makedirs.assert_called_with(entry.get("name"))