diff options
author | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2012-03-20 13:06:16 -0400 |
---|---|---|
committer | Chris St. Pierre <chris.a.st.pierre@gmail.com> | 2012-03-20 13:06:34 -0400 |
commit | d939fc920fe5d69768b017c873f1bbef5f4b2795 (patch) | |
tree | 4a47af2c82b60730789132e4b13a9e81765535fc /testsuite | |
parent | 60c79ef728f1d932e8da8583e2719b6acd900d9e (diff) | |
download | bcfg2-d939fc920fe5d69768b017c873f1bbef5f4b2795.tar.gz bcfg2-d939fc920fe5d69768b017c873f1bbef5f4b2795.tar.bz2 bcfg2-d939fc920fe5d69768b017c873f1bbef5f4b2795.zip |
organized testsuite to mirror src
updated TestOptions
removed terrifically out-of-date TestFrame and TestPlugin
wrote unit tests for Metadata plugin
various tweaks and bugfixes for Metadata plugin
Diffstat (limited to 'testsuite')
-rw-r--r-- | testsuite/TestFrame.py | 104 | ||||
-rw-r--r-- | testsuite/TestOptions.py | 103 | ||||
-rw-r--r-- | testsuite/TestPlugin.py | 122 | ||||
-rw-r--r-- | testsuite/Testlib/TestOptions.py | 124 | ||||
-rw-r--r-- | testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py | 898 |
5 files changed, 1022 insertions, 329 deletions
diff --git a/testsuite/TestFrame.py b/testsuite/TestFrame.py deleted file mode 100644 index a76c97eec..000000000 --- a/testsuite/TestFrame.py +++ /dev/null @@ -1,104 +0,0 @@ -import lxml.etree - -import Bcfg2.Client.Frame -import Bcfg2.Client.Tools - -c1 = lxml.etree.XML("<Configuration><Bundle name='foo'><Configfile name='/tmp/test12' owner='root' group='root' empty='true' perms='644'/></Bundle></Configuration>") - -c2 = lxml.etree.XML("<Configuration><Bundle name='foo'><Configfile name='/tmp/test12' owner='root' group='root' empty='true' perms='644'/><Configfile name='/tmp/test12' owner='root' group='root' empty='true' perms='644'/></Bundle></Configuration>") - - -class DriverInitFail(object): - def __init__(self, *args): - raise Bcfg2.Client.Tools.toolInstantiationError - - -class DriverInventoryFail(object): - __name__ = 'dif' - - def __init__(self, logger, setup, config): - self.config = config - self.handled = [] - self.modified = [] - self.extra = [] - - def Inventory(self): - raise Error - - -class TestFrame(object): - def test__init(self): - setup = {} - times = {} - config = lxml.etree.Element('Configuration') - frame = Bcfg2.Client.Frame.Frame(config, setup, times, [], False) - assert frame.tools == [] - - def test__init2(self): - setup = {} - times = {} - frame2 = Bcfg2.Client.Frame.Frame(c1, setup, times, ['POSIX'], False) - assert len(frame2.tools) == 1 - - def test__init3(self): - setup = {} - times = {} - frame3 = Bcfg2.Client.Frame.Frame(c2, setup, times, ['foo'], False) - assert len(frame3.tools) == 0 - - def test__init4(self): - setup = {} - times = {} - frame = Bcfg2.Client.Frame.Frame(c2, setup, times, [DriverInitFail], False) - assert len(frame.tools) == 0 - - def test__Decide_Inventory(self): - setup = {'remove': 'none', - 'bundle': [], - 'interactive': False} - times = {} - frame = Bcfg2.Client.Frame.Frame(c2, setup, times, - [DriverInventoryFail], False) - assert len(frame.tools) == 1 - frame.Inventory() - assert len([x for x in list(frame.states.values()) if x]) == 0 - frame.Decide() - assert len(frame.whitelist) - - def test__Decide_Bundle(self): - setup = {'remove': 'none', - 'bundle': ['bar'], - 'interactive': False} - times = {} - frame = Bcfg2.Client.Frame.Frame(c2, setup, times, - [DriverInventoryFail], False) - assert len(frame.tools) == 1 - frame.Inventory() - assert len([x for x in list(frame.states.values()) if x]) == 0 - frame.Decide() - assert len(frame.whitelist) == 0 - - def test__Decide_Dryrun(self): - setup = {'remove': 'none', - 'bundle': [], - 'interactive': False} - times = {} - frame = Bcfg2.Client.Frame.Frame(c2, setup, times, - [DriverInventoryFail], True) - assert len(frame.tools) == 1 - frame.Inventory() - assert len([x for x in list(frame.states.values()) if x]) == 0 - frame.Decide() - assert len(frame.whitelist) == 0 - - def test__GenerateStats(self): - setup = {'remove': 'none', - 'bundle': [], - 'interactive': False} - times = {} - frame = Bcfg2.Client.Frame.Frame(c2, setup, times, - [DriverInventoryFail], False) - frame.Inventory() - frame.Decide() - stats = frame.GenerateStats() - assert len(stats.findall('.//Bad')[0].getchildren()) != 0 diff --git a/testsuite/TestOptions.py b/testsuite/TestOptions.py deleted file mode 100644 index 735455d45..000000000 --- a/testsuite/TestOptions.py +++ /dev/null @@ -1,103 +0,0 @@ -import os -import sys - -import Bcfg2.Options - - -class TestOption(object): - def test__init(self): - o = Bcfg2.Options.Option('foo', False, cmd='-F') - try: - p = Bcfg2.Options.Option('foo', False, cmd='--F') - assert False - except Bcfg2.Options.OptionFailure: - pass - - def test_parse(self): - o = Bcfg2.Options.Option('foo', 'test4', cmd='-F', env='TEST2', - odesc='bar', cf=('communication', 'password')) - o.parse([], ['-F', 'test']) - assert o.value == 'test' - o.parse([('-F', 'test2')], []) - assert o.value == 'test2' - os.environ['TEST2'] = 'test3' - o.parse([], []) - assert o.value == 'test3' - del os.environ['TEST2'] - o.parse([], []) - print(o.value) - assert o.value == 'foobat' - o.cf = ('communication', 'pwd') - o.parse([], []) - print(o.value) - assert o.value == 'test4' - o.cf = False - o.parse([], []) - assert o.value == 'test4' - - def test_cook(self): - # check that default value isn't cooked - o1 = Bcfg2.Options.Option('foo', 'test4', cook=Bcfg2.Options.bool_cook) - o1.parse([], []) - assert o1.value == 'test4' - o2 = Bcfg2.Options.Option('foo', False, cmd='-F') - o2.parse([('-F', '')], []) - assert o2.value == True - - -class TestOptionSet(object): - def test_buildGetopt(self): - opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-G')), - ('bar', Bcfg2.Options.Option('foo', 'test2')), - ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='-H', - odesc='1'))] - os = Bcfg2.Options.OptionSet(opts) - res = os.buildGetopt() - assert 'H:' in res and 'G' in res and len(res) == 3 - - def test_buildLongGetopt(self): - opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-G')), - ('bar', Bcfg2.Options.Option('foo', 'test2')), - ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='--H', - odesc='1', long_arg=True))] - os = Bcfg2.Options.OptionSet(opts) - res = os.buildLongGetopt() - print(res) - assert 'H=' in res and len(res) == 1 - - def test_parse(self): - opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-G')), - ('bar', Bcfg2.Options.Option('foo', 'test2')), - ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='-H', - odesc='1'))] - os = Bcfg2.Options.OptionSet(opts) - try: - os.parse(['-G', '-H']) - assert False - except SystemExit: - pass - os2 = Bcfg2.Options.OptionSet(opts) - try: - os2.parse(['-h']) - assert False - except SystemExit: - pass - os3 = Bcfg2.Options.OptionSet(opts) - os3.parse(['-G']) - assert os3['foo'] == True - - -class TestOptionParser(object): - def test__init(self): - opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-h')), - ('bar', Bcfg2.Options.Option('foo', 'test2')), - ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='-H', - odesc='1'))] - os1 = Bcfg2.Options.OptionParser(opts) - assert Bcfg2.Options.Option.cfpath == '/etc/bcfg2.conf' - sys.argv = ['foo', '-C', '/usr/local/etc/bcfg2.conf'] - os2 = Bcfg2.Options.OptionParser(opts) - assert Bcfg2.Options.Option.cfpath == '/usr/local/etc/bcfg2.conf' - sys.argv = [] - os3 = Bcfg2.Options.OptionParser(opts) - assert Bcfg2.Options.Option.cfpath == '/etc/bcfg2.conf' diff --git a/testsuite/TestPlugin.py b/testsuite/TestPlugin.py deleted file mode 100644 index aa619249a..000000000 --- a/testsuite/TestPlugin.py +++ /dev/null @@ -1,122 +0,0 @@ -import gamin -import lxml.etree -import os - -import Bcfg2.Server.Core -from Bcfg2.Server.Plugin import EntrySet - - -class es_testtype(object): - def __init__(self, name, properties, specific): - self.name = name - self.properties = properties - self.specific = specific - self.handled = 0 - self.built = 0 - - def handle_event(self, event): - self.handled += 1 - - def bind_entry(self, entry, metadata): - entry.set('bound', '1') - entry.set('name', self.name) - self.built += 1 - - -class metadata(object): - def __init__(self, hostname): - self.hostname = hostname - self.groups = ['base', 'debian'] - -#FIXME add test_specific - - -class test_entry_set(object): - def __init__(self): - self.dirname = '/tmp/estest-%d' % os.getpid() - os.path.isdir(self.dirname) or os.mkdir(self.dirname) - self.metadata = metadata('testhost') - self.es = EntrySet('template', self.dirname, None, es_testtype) - self.e = Bcfg2.Server.Core.GaminEvent(1, 'template', - gamin.GAMExists) - - def test_init(self): - es = self.es - e = self.e - e.action = 'exists' - es.handle_event(e) - es.handle_event(e) - assert len(es.entries) == 1 - assert list(es.entries.values())[0].handled == 2 - e.action = 'changed' - es.handle_event(e) - assert list(es.entries.values())[0].handled == 3 - - def test_info(self): - """Test info and info.xml handling.""" - es = self.es - e = self.e - dirname = self.dirname - metadata = self.metadata - - # test 'info' handling - assert es.metadata['group'] == 'root' - self.mk_info(dirname) - e.filename = 'info' - e.action = 'exists' - es.handle_event(e) - assert es.metadata['group'] == 'sys' - e.action = 'deleted' - es.handle_event(e) - assert es.metadata['group'] == 'root' - - # test 'info.xml' handling - assert es.infoxml == None - self.mk_info_xml(dirname) - e.filename = 'info.xml' - e.action = 'exists' - es.handle_event(e) - assert es.infoxml - e.action = 'deleted' - es.handle_event(e) - assert es.infoxml == None - - def test_file_building(self): - """Test file building.""" - self.test_init() - ent = lxml.etree.Element('foo') - self.es.bind_entry(ent, self.metadata) - print(list(self.es.entries.values())[0]) - assert list(self.es.entries.values())[0].built == 1 - - def test_host_specific_file_building(self): - """Add a host-specific template and build it.""" - self.e.filename = 'template.H_%s' % self.metadata.hostname - self.e.action = 'exists' - self.es.handle_event(self.e) - assert len(self.es.entries) == 1 - ent = lxml.etree.Element('foo') - self.es.bind_entry(ent, self.metadata) - # FIXME need to test that it built the _right_ file here - - def test_deletion(self): - """Test deletion of files.""" - self.test_init() - self.e.filename = 'template' - self.e.action = 'deleted' - self.es.handle_event(self.e) - assert len(self.es.entries) == 0 - - # TODO - how to clean up the temp dir & files after tests done? - - def mk_info(self, dir): - i = open("%s/info" % dir, 'w') - i.write('owner: root\n') - i.write('group: sys\n') - i.write('perms: 0600\n') - i.close - - def mk_info_xml(self, dir): - i = open("%s/info.xml" % dir, 'w') - i.write('<FileInfo><Info owner="root" group="other" perms="0600" /></FileInfo>\n') - i.close diff --git a/testsuite/Testlib/TestOptions.py b/testsuite/Testlib/TestOptions.py new file mode 100644 index 000000000..f5833a54a --- /dev/null +++ b/testsuite/Testlib/TestOptions.py @@ -0,0 +1,124 @@ +import os +import sys +import unittest +from mock import Mock, patch +import Bcfg2.Options + +class TestOption(unittest.TestCase): + def test__init(self): + self.assertRaises(Bcfg2.Options.OptionFailure, + Bcfg2.Options.Option, + 'foo', False, cmd='f') + self.assertRaises(Bcfg2.Options.OptionFailure, + Bcfg2.Options.Option, + 'foo', False, cmd='--f') + self.assertRaises(Bcfg2.Options.OptionFailure, + Bcfg2.Options.Option, + 'foo', False, cmd='-foo') + self.assertRaises(Bcfg2.Options.OptionFailure, + Bcfg2.Options.Option, + 'foo', False, cmd='-foo', long_arg=True) + + @patch('ConfigParser.ConfigParser') + @patch('__builtin__.open') + def test_getCFP(self, mock_open, mock_cp): + mock_cp.return_value = Mock() + o = Bcfg2.Options.Option('foo', False, cmd='-f') + self.assertFalse(o._Option__cfp) + o.getCFP() + mock_cp.assert_any_call() + mock_open.assert_any_call(Bcfg2.Options.DEFAULT_CONFIG_LOCATION) + self.assertTrue(mock_cp.return_value.readfp.called) + + @patch('Bcfg2.Options.Option.cfp') + def test_parse(self, mock_cfp): + cf = ('communication', 'password') + o = Bcfg2.Options.Option('foo', 'test4', cmd='-F', env='TEST2', + odesc='bar', cf=cf) + o.parse([], ['-F', 'test']) + self.assertEqual(o.value, 'test') + o.parse([('-F', 'test2')], []) + self.assertEqual(o.value, 'test2') + + os.environ['TEST2'] = 'test3' + o.parse([], []) + self.assertEqual(o.value, 'test3') + del os.environ['TEST2'] + + mock_cfp.get = Mock() + mock_cfp.get.return_value = 'test5' + o.parse([], []) + self.assertEqual(o.value, 'test5') + mock_cfp.get.assert_any_call(*cf) + + o.cf = False + o.parse([], []) + assert o.value == 'test4' + + def test_cook(self): + # check that default value isn't cooked + o1 = Bcfg2.Options.Option('foo', 'test4', cook=Bcfg2.Options.bool_cook) + o1.parse([], []) + assert o1.value == 'test4' + o2 = Bcfg2.Options.Option('foo', False, cmd='-F') + o2.parse([('-F', '')], []) + assert o2.value == True + + +class TestOptionSet(unittest.TestCase): + def test_buildGetopt(self): + opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-G')), + ('bar', Bcfg2.Options.Option('foo', 'test2')), + ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='-H', + odesc='1'))] + oset = Bcfg2.Options.OptionSet(opts) + res = oset.buildGetopt() + self.assertIn('H:', res) + self.assertIn('G', res) + self.assertEqual(len(res), 3) + + def test_buildLongGetopt(self): + opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-G')), + ('bar', Bcfg2.Options.Option('foo', 'test2')), + ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='--H', + odesc='1', long_arg=True))] + oset = Bcfg2.Options.OptionSet(opts) + res = oset.buildLongGetopt() + self.assertIn('H=', res) + self.assertEqual(len(res), 1) + + def test_parse(self): + opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-G')), + ('bar', Bcfg2.Options.Option('foo', 'test2')), + ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='-H', + odesc='1'))] + oset = Bcfg2.Options.OptionSet(opts) + self.assertRaises(SystemExit, + oset.parse, + ['-G', '-H']) + oset2 = Bcfg2.Options.OptionSet(opts) + self.assertRaises(SystemExit, + oset2.parse, + ['-h']) + oset3 = Bcfg2.Options.OptionSet(opts) + oset3.parse(['-G']) + self.assertTrue(oset3['foo']) + + +class TestOptionParser(unittest.TestCase): + def test__init(self): + opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-h')), + ('bar', Bcfg2.Options.Option('foo', 'test2')), + ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='-H', + odesc='1'))] + oset1 = Bcfg2.Options.OptionParser(opts) + self.assertEqual(Bcfg2.Options.Option.cfpath, + Bcfg2.Options.DEFAULT_CONFIG_LOCATION) + sys.argv = ['foo', '-C', '/usr/local/etc/bcfg2.conf'] + oset2 = Bcfg2.Options.OptionParser(opts) + self.assertEqual(Bcfg2.Options.Option.cfpath, + '/usr/local/etc/bcfg2.conf') + sys.argv = [] + oset3 = Bcfg2.Options.OptionParser(opts) + self.assertEqual(Bcfg2.Options.Option.cfpath, + Bcfg2.Options.DEFAULT_CONFIG_LOCATION) diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py new file mode 100644 index 000000000..4d0b96ee7 --- /dev/null +++ b/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py @@ -0,0 +1,898 @@ +import os +import copy +import time +import socket +import unittest +import lxml.etree +from mock import Mock, patch +import Bcfg2.Server.Plugin +from Bcfg2.Server.Plugins.Metadata import * + +XI_NAMESPACE = "http://www.w3.org/2001/XInclude" +XI = "{%s}" % XI_NAMESPACE + +clients_test_tree = lxml.etree.XML(''' +<Clients> + <Client name="client1" address="1.2.3.1" auth="cert" + location="floating" password="password2" profile="group1"/> + <Client name="client2" address="1.2.3.2" secure="true" profile="group2"/> + <Client name="client3" address="1.2.3.3" uuid="uuid1" profile="group1" + password="password2"> + <Alias name="alias1"/> + </Client> + <Client name="client4" profile="group1"> + <Alias name="alias2" address="1.2.3.2"/> + <Alias name="alias3"/> + </Client> + <Client name="client5" profile="group1"/> + <Client name="client6" profile="group1" auth="bootstrap"/> + <Client name="client7" profile="group1" auth="cert" address="1.2.3.4"/> + <Client name="client8" profile="group1" auth="cert+password" + address="1.2.3.5"/> + <Client name="client9" profile="group2" secure="true" password="password3"/> +</Clients>''').getroottree() + +groups_test_tree = lxml.etree.XML(''' +<Groups xmlns:xi="http://www.w3.org/2001/XInclude"> + <Group name="group1" default="true" profile="true" public="true" + category="category1"/> + <Group name="group2" profile="true" public="true" category="category1"> + <Bundle name="bundle1"/> + <Bundle name="bundle2"/> + <Group name="group1"/> + <Group name="group4"/> + </Group> + <Group name="group3" category="category2" public="false"/> + <Group name="group4" category="category1"> + <Group name="group1"/> + <Group name="group6"/> + </Group> + <Group name="group5"/> + <Group name="group7"> + <Bundle name="bundle3"/> + </Group> + <Group name="group8"> + <Group name="group9"/> + </Group> +</Groups>''').getroottree() + +datastore = "/" + +def get_metadata_object(core=None, watch_clients=False): + if core is None: + core = Mock() + metadata = Metadata(core, datastore, watch_clients=watch_clients) + #metadata.logger = Mock() + return metadata + + +class TestXMLMetadataConfig(unittest.TestCase): + def get_config_object(self, basefile=None, core=None, watch_clients=False): + self.metadata = get_metadata_object(core=core, + watch_clients=watch_clients) + return XMLMetadataConfig(self.metadata, watch_clients, basefile) + + def test_xdata(self): + config = self.get_config_object() + # we can't use assertRaises here because xdata is a property + try: + config.xdata + except MetadataRuntimeError: + pass + except: + assert False + config.data = "<test/>" + self.assertEqual(config.xdata, "<test/>") + + def test_base_xdata(self): + config = self.get_config_object() + # we can't use assertRaises here because base_xdata is a property + try: + config.base_xdata + except MetadataRuntimeError: + pass + except: + assert False + config.basedata = "<test/>" + self.assertEqual(config.base_xdata, "<test/>") + + def test_add_monitor(self): + core = Mock() + config = self.get_config_object(core=core) + config.extras = [] + config.add_monitor("test.xml") + self.assertFalse(core.fam.AddMonitor.called) + self.assertEqual(config.extras, []) + + config = self.get_config_object(core=core, watch_clients=True) + config.add_monitor("test.xml") + core.fam.AddMonitor.assert_called_with(os.path.join(self.metadata.data, + "test.xml"), + self.metadata) + self.assertItemsEqual(config.extras, ["test.xml"]) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.add_monitor") + @patch("lxml.etree.parse") + def test_load_xml(self, mock_parse, mock_add_monitor): + config = self.get_config_object("clients.xml") + mock_parse.side_effect = lxml.etree.XMLSyntaxError(None, None, None, + None) + config.load_xml() + self.assertIsNone(config.data) + self.assertIsNone(config.basedata) + + config.data = None + config.basedata = None + mock_parse.side_effect = None + mock_parse.return_value.findall = Mock(return_value=[]) + config.load_xml() + self.assertIsNotNone(config.data) + self.assertIsNotNone(config.basedata) + + config.data = None + config.basedata = None + mock_parse.return_value.findall = \ + Mock(return_value=[lxml.etree.Element(XI + "include", + href="more.xml"), + lxml.etree.Element(XI + "include", + href="evenmore.xml")]) + config.load_xml() + mock_add_monitor.assert_any_call("more.xml") + mock_add_monitor.assert_any_call("evenmore.xml") + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml") + def test_write(self, mock_write_xml): + config = self.get_config_object("clients.xml") + config.basedata = "<test/>" + config.write() + mock_write_xml.assert_called_with(os.path.join(self.metadata.data, + "clients.xml"), + "<test/>") + + @patch('Bcfg2.Server.Plugins.Metadata.locked', Mock(return_value=False)) + @patch('fcntl.lockf', Mock()) + @patch('__builtin__.open') + @patch('os.unlink') + @patch('os.rename') + @patch('os.path.islink') + @patch('os.readlink') + def test_write_xml(self, mock_readlink, mock_islink, mock_rename, + mock_unlink, mock_open): + fname = "clients.xml" + config = self.get_config_object(fname) + fpath = os.path.join(self.metadata.data, fname) + tmpfile = "%s.new" % fpath + linkdest = os.path.join(self.metadata.data, "client-link.xml") + + mock_islink.return_value = False + + config.write_xml(fpath, clients_test_tree) + mock_open.assert_called_with(tmpfile, "w") + self.assertTrue(mock_open.return_value.write.called) + mock_islink.assert_called_with(fpath) + mock_rename.assert_called_with(tmpfile, fpath) + + mock_islink.return_value = True + mock_readlink.return_value = linkdest + config.write_xml(fpath, clients_test_tree) + mock_rename.assert_called_with(tmpfile, linkdest) + + mock_rename.side_effect = OSError + self.assertRaises(MetadataRuntimeError, + config.write_xml, fpath, clients_test_tree) + + mock_open.return_value.write.side_effect = IOError + self.assertRaises(MetadataRuntimeError, + config.write_xml, fpath, clients_test_tree) + mock_unlink.assert_called_with(tmpfile) + + mock_open.side_effect = IOError + self.assertRaises(MetadataRuntimeError, + config.write_xml, fpath, clients_test_tree) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch('lxml.etree.parse') + def test_find_xml_for_xpath(self, mock_parse): + config = self.get_config_object("groups.xml") + config.basedata = groups_test_tree + xpath = "//Group[@name='group1']" + self.assertItemsEqual(config.find_xml_for_xpath(xpath), + dict(filename=os.path.join(self.metadata.data, + "groups.xml"), + xmltree=groups_test_tree, + xquery=groups_test_tree.xpath(xpath))) + + self.assertEqual(config.find_xml_for_xpath("//boguselement"), dict()) + + config.extras = ["foo.xml", "bar.xml", "clients.xml"] + + def parse_side_effect(fname): + if fname == os.path.join(self.metadata.data, "clients.xml"): + return clients_test_tree + else: + return lxml.etree.XML("<null/>").getroottree() + + mock_parse.side_effect = parse_side_effect + xpath = "//Client[@secure='true']" + self.assertItemsEqual(config.find_xml_for_xpath(xpath), + dict(filename=os.path.join(self.metadata.data, + "clients.xml"), + xmltree=clients_test_tree, + xquery=clients_test_tree.xpath(xpath))) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml") + def test_HandleEvent(self, mock_load_xml): + config = self.get_config_object("groups.xml") + evt = Mock() + evt.filename = os.path.join(self.metadata.data, "groups.xml") + evt.code2str = Mock(return_value="changed") + self.assertTrue(config.HandleEvent(evt)) + mock_load_xml.assert_called_with() + + +class TestClientMetadata(unittest.TestCase): + def test_inGroup(self): + cm = ClientMetadata("client1", "group1", ["group1", "group2"], + ["bundle1"], [], [], [], None, None, None) + self.assertTrue(cm.inGroup("group1")) + self.assertFalse(cm.inGroup("group3")) + + +class TestMetadata(unittest.TestCase): + def test__init_no_fam(self): + # test with watch_clients=False + core = Mock() + metadata = get_metadata_object(core=core) + self.check_metadata_object(metadata) + self.assertEqual(metadata.states, dict()) + + def test__init_with_fam(self): + # test with watch_clients=True + core = Mock() + core.fam = Mock() + metadata = get_metadata_object(core=core, watch_clients=True) + self.assertEqual(len(metadata.states), 2) + core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, + "groups.xml"), + metadata) + core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data, + "clients.xml"), + metadata) + + core.fam.reset_mock() + core.fam.AddMonitor = Mock(side_effect=IOError) + self.assertRaises(Bcfg2.Server.Plugin.PluginInitError, + get_metadata_object, + core=core, watch_clients=True) + + def check_metadata_object(self, metadata): + self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin) + self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata) + self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Statistics) + self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig) + self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig) + self.assertIsInstance(metadata.query, MetadataQuery) + + @patch('os.makedirs', Mock()) + @patch('__builtin__.open') + def test_init_repo(self, mock_open): + groups = "groups %s" + os_selection = "os" + clients = "clients %s" + Metadata.init_repo(datastore, groups, os_selection, clients) + mock_open.assert_any_call(os.path.join(datastore, "Metadata", + "groups.xml"), "w") + mock_open.assert_any_call(os.path.join(datastore, "Metadata", + "clients.xml"), "w") + + @patch('lxml.etree.parse') + def test_get_groups(self, mock_parse): + metadata = get_metadata_object() + mock_parse.return_value = groups_test_tree + groups = metadata.get_groups() + mock_parse.assert_called_with(os.path.join(datastore, "Metadata", + "groups.xml")) + self.assertIsInstance(groups, lxml.etree._Element) + + def test_search_xdata_name(self): + # test finding a node with the proper name + metadata = get_metadata_object() + tree = groups_test_tree + res = metadata._search_xdata("Group", "group1", tree) + self.assertIsInstance(res, lxml.etree._Element) + self.assertEqual(res.get("name"), "group1") + + def test_search_xdata_alias(self): + # test finding a node with the wrong name but correct alias + metadata = get_metadata_object() + tree = clients_test_tree + res = metadata._search_xdata("Client", "alias3", tree, alias=True) + self.assertIsInstance(res, lxml.etree._Element) + self.assertNotEqual(res.get("name"), "alias3") + + def test_search_xdata_not_found(self): + # test failure finding a node + metadata = get_metadata_object() + tree = clients_test_tree + res = metadata._search_xdata("Client", "bogus_client", tree, alias=True) + self.assertIsNone(res) + + def search_xdata(self, tag, name, tree, alias=False): + metadata = get_metadata_object() + res = metadata._search_xdata(tag, name, tree, alias=alias) + self.assertIsInstance(res, lxml.etree._Element) + if not alias: + self.assertEqual(res.get("name"), name) + + def test_search_group(self): + # test finding a group with the proper name + tree = groups_test_tree + self.search_xdata("Group", "group1", tree) + + def test_search_bundle(self): + # test finding a bundle with the proper name + tree = groups_test_tree + self.search_xdata("Bundle", "bundle1", tree) + + def test_search_client(self): + # test finding a client with the proper name + tree = clients_test_tree + self.search_xdata("Client", "client1", tree, alias=True) + self.search_xdata("Client", "alias1", tree, alias=True) + + def test_add_group(self): + metadata = get_metadata_object() + metadata.groups_xml.write = Mock() + metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree() + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.add_group("test1", dict()) + metadata.groups_xml.write.assert_any_call() + grp = metadata.search_group("test1", metadata.groups_xml.base_xdata) + self.assertIsNotNone(grp) + self.assertEqual(grp.attrib, dict(name='test1')) + + # have to call this explicitly -- usually load_xml does this + # on FAM events + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.add_group("test2", dict(foo='bar')) + metadata.groups_xml.write.assert_any_call() + grp = metadata.search_group("test2", metadata.groups_xml.base_xdata) + self.assertIsNotNone(grp) + self.assertEqual(grp.attrib, dict(name='test2', foo='bar')) + + # have to call this explicitly -- usually load_xml does this + # on FAM events + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.groups_xml.write.reset_mock() + self.assertRaises(MetadataConsistencyError, + metadata.add_group, + "test1", dict()) + self.assertFalse(metadata.groups_xml.write.called) + + def test_update_group(self): + metadata = get_metadata_object() + metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.data = copy.deepcopy(groups_test_tree) + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.update_group("group1", dict(foo="bar")) + grp = metadata.search_group("group1", metadata.groups_xml.base_xdata) + self.assertIsNotNone(grp) + self.assertIn("foo", grp.attrib) + self.assertEqual(grp.get("foo"), "bar") + self.assertTrue(metadata.groups_xml.write_xml.called) + + self.assertRaises(MetadataConsistencyError, + metadata.update_group, + "bogus_group", dict()) + + def test_remove_group(self): + metadata = get_metadata_object() + metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.data = copy.deepcopy(groups_test_tree) + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.remove_group("group5") + grp = metadata.search_group("group5", metadata.groups_xml.base_xdata) + self.assertIsNone(grp) + self.assertTrue(metadata.groups_xml.write_xml.called) + + self.assertRaises(MetadataConsistencyError, + metadata.remove_group, + "bogus_group") + + def test_add_bundle(self): + metadata = get_metadata_object() + metadata.groups_xml.write = Mock() + metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree() + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.add_bundle("bundle1") + metadata.groups_xml.write.assert_any_call() + bundle = metadata.search_bundle("bundle1", + metadata.groups_xml.base_xdata) + self.assertIsNotNone(bundle) + self.assertEqual(bundle.attrib, dict(name='bundle1')) + + # have to call this explicitly -- usually load_xml does this + # on FAM events + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.groups_xml.write.reset_mock() + self.assertRaises(MetadataConsistencyError, + metadata.add_bundle, + "bundle1") + self.assertFalse(metadata.groups_xml.write.called) + + def test_remove_bundle(self): + metadata = get_metadata_object() + metadata.groups_xml.write_xml = Mock() + metadata.groups_xml.data = copy.deepcopy(groups_test_tree) + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + + metadata.remove_bundle("bundle1") + grp = metadata.search_bundle("bundle1", metadata.groups_xml.base_xdata) + self.assertIsNone(grp) + self.assertTrue(metadata.groups_xml.write_xml.called) + + self.assertRaises(MetadataConsistencyError, + metadata.remove_bundle, + "bogus_bundle") + + def test_add_client(self): + metadata = get_metadata_object() + metadata.clients_xml.write = Mock() + metadata.clients_xml.data = lxml.etree.XML('<Clients/>').getroottree() + metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) + + metadata.add_client("test1", dict()) + metadata.clients_xml.write.assert_any_call() + grp = metadata.search_client("test1", metadata.clients_xml.base_xdata) + self.assertIsNotNone(grp) + self.assertEqual(grp.attrib, dict(name='test1')) + + # have to call this explicitly -- usually load_xml does this + # on FAM events + metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) + + metadata.add_client("test2", dict(foo='bar')) + metadata.clients_xml.write.assert_any_call() + grp = metadata.search_client("test2", metadata.clients_xml.base_xdata) + self.assertIsNotNone(grp) + self.assertEqual(grp.attrib, dict(name='test2', foo='bar')) + + # have to call this explicitly -- usually load_xml does this + # on FAM events + metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) + + metadata.clients_xml.write.reset_mock() + self.assertRaises(MetadataConsistencyError, + metadata.add_client, + "test1", dict()) + self.assertFalse(metadata.clients_xml.write.called) + + def test_update_client(self): + metadata = get_metadata_object() + metadata.clients_xml.write_xml = Mock() + metadata.clients_xml.data = copy.deepcopy(clients_test_tree) + metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) + + metadata.update_client("client1", dict(foo="bar")) + grp = metadata.search_client("client1", metadata.clients_xml.base_xdata) + self.assertIsNotNone(grp) + self.assertIn("foo", grp.attrib) + self.assertEqual(grp.get("foo"), "bar") + self.assertTrue(metadata.clients_xml.write_xml.called) + + self.assertRaises(MetadataConsistencyError, + metadata.update_client, + "bogus_client", dict()) + + def load_clients_data(self, metadata=None, xdata=None): + if metadata is None: + metadata = get_metadata_object() + metadata.clients_xml.data = xdata or copy.deepcopy(clients_test_tree) + metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data) + evt = Mock() + evt.filename = os.path.join(datastore, "Metadata", "clients.xml") + evt.code2str = Mock(return_value="changed") + metadata.HandleEvent(evt) + return metadata + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml") + def test_clients_xml_event(self, mock_load_xml): + metadata = get_metadata_object() + metadata.profiles = ["group1", "group2"] + self.load_clients_data(metadata=metadata) + mock_load_xml.assert_any_call() + self.assertItemsEqual(metadata.clients, + dict([(c.get("name"), c.get("profile")) + for c in clients_test_tree.findall("//Client")])) + aliases = dict([(a.get("name"), a.getparent().get("name")) + for a in clients_test_tree.findall("//Alias")]) + self.assertItemsEqual(metadata.aliases, aliases) + + raliases = dict([(c.get("name"), set()) + for c in clients_test_tree.findall("//Client")]) + for alias in clients_test_tree.findall("//Alias"): + raliases[alias.getparent().get("name")].add(alias.get("name")) + self.assertItemsEqual(metadata.raliases, raliases) + + self.assertEqual(metadata.bad_clients, dict()) + self.assertEqual(metadata.secure, + [c.get("name") + for c in clients_test_tree.findall("//Client[@secure='true']")]) + self.assertEqual(metadata.floating, ["client1"]) + + addresses = dict([(c.get("address"), []) + for c in clients_test_tree.findall("//*[@address]")]) + raddresses = dict() + for client in clients_test_tree.findall("//Client[@address]"): + addresses[client.get("address")].append(client.get("name")) + try: + raddresses[client.get("name")].append(client.get("address")) + except KeyError: + raddresses[client.get("name")] = [client.get("address")] + for alias in clients_test_tree.findall("//Alias[@address]"): + addresses[alias.get("address")].append(alias.getparent().get("name")) + try: + raddresses[alias.getparent().get("name")].append(alias.get("address")) + except KeyError: + raddresses[alias.getparent().get("name")] = alias.get("address") + + self.assertItemsEqual(metadata.addresses, addresses) + self.assertItemsEqual(metadata.raddresses, raddresses) + self.assertTrue(metadata.states['clients.xml']) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_clients_xml_event_bad_clients(self): + metadata = get_metadata_object() + metadata.profiles = ["group2"] + self.load_clients_data(metadata=metadata) + clients = dict() + badclients = dict() + for client in clients_test_tree.findall("//Client"): + if client.get("profile") in metadata.profiles: + clients[client.get("name")] = client.get("profile") + else: + badclients[client.get("name")] = client.get("profile") + self.assertItemsEqual(metadata.clients, clients) + self.assertItemsEqual(metadata.bad_clients, badclients) + + def load_groups_data(self, metadata=None, xdata=None): + if metadata is None: + metadata = get_metadata_object() + metadata.groups_xml.data = xdata or copy.deepcopy(groups_test_tree) + metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data) + evt = Mock() + evt.filename = os.path.join(datastore, "Metadata", "groups.xml") + evt.code2str = Mock(return_value="changed") + metadata.HandleEvent(evt) + return metadata + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml") + def test_groups_xml_event(self, mock_load_xml): + dup_data = copy.deepcopy(groups_test_tree) + lxml.etree.SubElement(dup_data.getroot(), + "Group", name="group1") + metadata = self.load_groups_data(xdata=dup_data) + mock_load_xml.assert_any_call() + self.assertEqual(metadata.public, ["group1", "group2"]) + self.assertEqual(metadata.private, ["group3"]) + self.assertEqual(metadata.profiles, ["group1", "group2"]) + self.assertItemsEqual(metadata.groups.keys(), + [g.get("name") + for g in groups_test_tree.findall("/Group")]) + self.assertEqual(metadata.categories, + dict(group1="category1", + group2="category1", + group3="category2", + group4="category1")) + self.assertEqual(metadata.default, "group1") + self.assertTrue(metadata.states['groups.xml']) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.add_client") + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client") + def test_set_profile(self, mock_update_client, mock_add_client): + metadata = get_metadata_object() + metadata.states['clients.xml'] = False + self.assertRaises(MetadataRuntimeError, + metadata.set_profile, + None, None, None) + + self.load_groups_data(metadata=metadata) + self.load_clients_data(metadata=metadata) + + self.assertRaises(MetadataConsistencyError, + metadata.set_profile, + "client1", "group5", None) + + metadata.clients_xml.write = Mock() + metadata.set_profile("client1", "group2", None) + mock_update_client.assert_called_with("client1", dict(profile="group2")) + metadata.clients_xml.write.assert_any_call() + self.assertEqual(metadata.clients["client1"], "group2") + + metadata.clients_xml.write.reset_mock() + metadata.set_profile("client_new", "group1", None) + mock_add_client.assert_called_with("client_new", dict(profile="group1")) + metadata.clients_xml.write.assert_any_call() + self.assertEqual(metadata.clients["client_new"], "group1") + + metadata.session_cache[('1.2.3.6', None)] = (None, 'client_new2') + metadata.clients_xml.write.reset_mock() + metadata.set_profile("uuid_new", "group1", ('1.2.3.6', None)) + mock_add_client.assert_called_with("client_new2", + dict(uuid='uuid_new', + profile="group1", + address='1.2.3.6')) + metadata.clients_xml.write.assert_any_call() + self.assertEqual(metadata.clients["uuid_new"], "group1") + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("socket.gethostbyaddr") + def test_resolve_client(self, mock_gethostbyaddr): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3') + self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3') + + self.assertRaises(MetadataConsistencyError, + metadata.resolve_client, + ('1.2.3.2', None)) + self.assertEqual(metadata.resolve_client(('1.2.3.1', None)), 'client1') + + metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100, + 'client3') + self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3') + self.assertEqual(metadata.resolve_client(('1.2.3.3', None), + cleanup_cache=True), 'client3') + self.assertEqual(metadata.session_cache, dict()) + + mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6']) + self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6') + mock_gethostbyaddr.assert_called_with('1.2.3.6') + + mock_gethostbyaddr.reset_mock() + mock_gethostbyaddr.return_value = ('alias3', [], ['1.2.3.7']) + self.assertEqual(metadata.resolve_client(('1.2.3.7', None)), 'client4') + mock_gethostbyaddr.assert_called_with('1.2.3.7') + + mock_gethostbyaddr.reset_mock() + mock_gethostbyaddr.return_value = None + mock_gethostbyaddr.side_effect = socket.herror + self.assertRaises(MetadataConsistencyError, + metadata.resolve_client, + ('1.2.3.8', None)) + mock_gethostbyaddr.assert_called_with('1.2.3.8') + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata") + def test_get_initial_metadata(self, mock_clientmetadata): + metadata = get_metadata_object() + metadata.states['clients.xml'] = False + self.assertRaises(MetadataRuntimeError, + metadata.get_initial_metadata, None) + + self.load_groups_data(metadata=metadata) + self.load_clients_data(metadata=metadata) + + metadata.get_initial_metadata("client1") + self.assertEqual(mock_clientmetadata.call_args[0][:9], + ("client1", "group1", set(["group1"]), set(), set(), + set(["1.2.3.1"]), dict(), None, 'password2')) + + metadata.get_initial_metadata("client2") + self.assertEqual(mock_clientmetadata.call_args[0][:9], + ("client2", "group2", set(["group1", "group2"]), + set(["bundle1", "bundle2"]), set(), + set(["1.2.3.2"]), dict(category1="group1"), + None, None)) + + imd = metadata.get_initial_metadata("alias1") + self.assertEqual(mock_clientmetadata.call_args[0][:9], + ("client3", "group1", set(["group1"]), set(), + set(['alias1']), set(["1.2.3.3"]), dict(), 'uuid1', + 'password2')) + + imd = metadata.get_initial_metadata("client_new") + self.assertEqual(mock_clientmetadata.call_args[0][:9], + ("client_new", "group1", set(["group1"]), set(), + set(), set(), dict(), None, None)) + + metadata.default = None + self.assertRaises(MetadataConsistencyError, + metadata.get_initial_metadata, + "client_new2") + + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_get_all_group_names(self): + metadata = self.load_groups_data() + self.assertItemsEqual(metadata.get_all_group_names(), + set([g.get("name") + for g in groups_test_tree.findall("//Group")])) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_get_all_groups_in_category(self): + metadata = self.load_groups_data() + self.assertItemsEqual(metadata.get_all_groups_in_category("category1"), + set([g.get("name") + for g in groups_test_tree.findall("//Group[@category='category1']")])) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_get_client_names_by_profiles(self): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + self.assertItemsEqual(metadata.get_client_names_by_profiles("group2"), + [c.get("name") + for c in clients_test_tree.findall("//Client[@profile='group2']")]) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_get_client_names_by_groups(self): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + # this is not the best test in the world, since we mock + # core.build_metadata to just build _initial_ metadata, which + # is not at all the same thing. it turns out that mocking + # this out without starting a Bcfg2 server is pretty + # non-trivial, so this works-ish + metadata.core.build_metadata = Mock() + metadata.core.build_metadata.side_effect = \ + lambda c: metadata.get_initial_metadata(c) + self.assertItemsEqual(metadata.get_client_names_by_groups(["group2"]), + [c.get("name") + for c in clients_test_tree.findall("//Client[@profile='group2']")]) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_merge_additional_groups(self): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + imd = metadata.get_initial_metadata("client2") + + # test adding a group excluded by categories + oldgroups = imd.groups + metadata.merge_additional_groups(imd, ["group4"]) + self.assertEqual(imd.groups, oldgroups) + + # test adding a private group + oldgroups = imd.groups + metadata.merge_additional_groups(imd, ["group3"]) + self.assertEqual(imd.groups, oldgroups) + + # test adding groups with bundles + oldgroups = imd.groups + oldbundles = imd.bundles + metadata.merge_additional_groups(imd, ["group7"]) + self.assertEqual(imd.groups, oldgroups.union(["group7"])) + self.assertEqual(imd.bundles, oldbundles.union(["bundle3"])) + + # test adding multiple groups + imd = metadata.get_initial_metadata("client2") + oldgroups = imd.groups + metadata.merge_additional_groups(imd, ["group6", "group8"]) + self.assertItemsEqual(imd.groups, + oldgroups.union(["group6", "group8", "group9"])) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + def test_merge_additional_data(self): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + imd = metadata.get_initial_metadata("client1") + + # we need to use a unique attribute name for this test. this + # is probably overkill, but it works + pattern = "connector%d" + for i in range(0, 100): + connector = pattern % i + if not hasattr(imd, connector): + break + self.assertFalse(hasattr(imd, connector), + "Could not find unique connector name to test " + "merge_additional_data()") + + metadata.merge_additional_data(imd, connector, "test data") + self.assertEqual(getattr(imd, connector), "test data") + self.assertIn(connector, imd.connectors) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") + def test_validate_client_address(self, mock_resolve_client): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + self.assertTrue(metadata.validate_client_address("client1", + (None, None))) + self.assertTrue(metadata.validate_client_address("client2", + ("1.2.3.2", None))) + self.assertFalse(metadata.validate_client_address("client2", + ("1.2.3.8", None))) + self.assertTrue(metadata.validate_client_address("client4", + ("1.2.3.2", None))) + # this is upper case to ensure that case is folded properly in + # validate_client_address() + mock_resolve_client.return_value = "CLIENT4" + self.assertTrue(metadata.validate_client_address("client4", + ("1.2.3.7", None))) + mock_resolve_client.assert_called_with(("1.2.3.7", None)) + + mock_resolve_client.reset_mock() + self.assertFalse(metadata.validate_client_address("client5", + ("1.2.3.5", None))) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address") + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client") + def test_AuthenticateConnection(self, mock_resolve_client, + mock_validate_client_address): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + metadata.password = "password1" + + cert = dict(subject=[[("commonName", "client1")]]) + mock_validate_client_address.return_value = False + self.assertFalse(metadata.AuthenticateConnection(cert, "root", None, + "1.2.3.1")) + mock_validate_client_address.return_value = True + self.assertTrue(metadata.AuthenticateConnection(cert, "root", None, + "1.2.3.1")) + # floating cert-auth clients add themselves to the cache + self.assertIn("1.2.3.1", metadata.session_cache) + self.assertEqual(metadata.session_cache["1.2.3.1"][1], "client1") + + cert = dict(subject=[[("commonName", "client7")]]) + self.assertTrue(metadata.AuthenticateConnection(cert, "root", None, + "1.2.3.4")) + # non-floating cert-auth clients do not add themselves to the cache + self.assertNotIn("1.2.3.4", metadata.session_cache) + + cert = dict(subject=[[("commonName", "client8")]]) + + mock_resolve_client.return_value = "client5" + self.assertTrue(metadata.AuthenticateConnection(None, "root", + "password1", "1.2.3.8")) + + mock_resolve_client.side_effect = MetadataConsistencyError + self.assertFalse(metadata.AuthenticateConnection(None, "root", + "password1", + "1.2.3.8")) + + # secure mode, no password + self.assertFalse(metadata.AuthenticateConnection(None, 'client2', None, + "1.2.3.2")) + + self.assertTrue(metadata.AuthenticateConnection(None, 'uuid1', + "password1", "1.2.3.3")) + # non-root, non-cert clients populate session cache + self.assertIn("1.2.3.3", metadata.session_cache) + self.assertEqual(metadata.session_cache["1.2.3.3"][1], "client3") + + # use alternate password + self.assertTrue(metadata.AuthenticateConnection(None, 'client3', + "password2", "1.2.3.3")) + + # test secure mode + self.assertFalse(metadata.AuthenticateConnection(None, 'client9', + "password1", + "1.2.3.9")) + self.assertTrue(metadata.AuthenticateConnection(None, 'client9', + "password3", "1.2.3.9")) + + self.assertFalse(metadata.AuthenticateConnection(None, "client5", + "password2", + "1.2.3.7")) + + @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock()) + @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client") + def test_process_statistics(self, mock_update_client): + metadata = self.load_clients_data(metadata=self.load_groups_data()) + md = Mock() + md.hostname = "client6" + metadata.process_statistics(md, None) + mock_update_client.assert_called_with(md.hostname, + dict(auth='cert')) + + mock_update_client.reset_mock() + md.hostname = "client5" + metadata.process_statistics(md, None) + self.assertFalse(mock_update_client.called) + + def test_viz(self): + pass |