summaryrefslogtreecommitdiffstats
path: root/testsuite
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2012-03-20 13:06:16 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2012-03-20 13:06:34 -0400
commitd939fc920fe5d69768b017c873f1bbef5f4b2795 (patch)
tree4a47af2c82b60730789132e4b13a9e81765535fc /testsuite
parent60c79ef728f1d932e8da8583e2719b6acd900d9e (diff)
downloadbcfg2-d939fc920fe5d69768b017c873f1bbef5f4b2795.tar.gz
bcfg2-d939fc920fe5d69768b017c873f1bbef5f4b2795.tar.bz2
bcfg2-d939fc920fe5d69768b017c873f1bbef5f4b2795.zip
organized testsuite to mirror src
updated TestOptions removed terrifically out-of-date TestFrame and TestPlugin wrote unit tests for Metadata plugin various tweaks and bugfixes for Metadata plugin
Diffstat (limited to 'testsuite')
-rw-r--r--testsuite/TestFrame.py104
-rw-r--r--testsuite/TestOptions.py103
-rw-r--r--testsuite/TestPlugin.py122
-rw-r--r--testsuite/Testlib/TestOptions.py124
-rw-r--r--testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py898
5 files changed, 1022 insertions, 329 deletions
diff --git a/testsuite/TestFrame.py b/testsuite/TestFrame.py
deleted file mode 100644
index a76c97eec..000000000
--- a/testsuite/TestFrame.py
+++ /dev/null
@@ -1,104 +0,0 @@
-import lxml.etree
-
-import Bcfg2.Client.Frame
-import Bcfg2.Client.Tools
-
-c1 = lxml.etree.XML("<Configuration><Bundle name='foo'><Configfile name='/tmp/test12' owner='root' group='root' empty='true' perms='644'/></Bundle></Configuration>")
-
-c2 = lxml.etree.XML("<Configuration><Bundle name='foo'><Configfile name='/tmp/test12' owner='root' group='root' empty='true' perms='644'/><Configfile name='/tmp/test12' owner='root' group='root' empty='true' perms='644'/></Bundle></Configuration>")
-
-
-class DriverInitFail(object):
- def __init__(self, *args):
- raise Bcfg2.Client.Tools.toolInstantiationError
-
-
-class DriverInventoryFail(object):
- __name__ = 'dif'
-
- def __init__(self, logger, setup, config):
- self.config = config
- self.handled = []
- self.modified = []
- self.extra = []
-
- def Inventory(self):
- raise Error
-
-
-class TestFrame(object):
- def test__init(self):
- setup = {}
- times = {}
- config = lxml.etree.Element('Configuration')
- frame = Bcfg2.Client.Frame.Frame(config, setup, times, [], False)
- assert frame.tools == []
-
- def test__init2(self):
- setup = {}
- times = {}
- frame2 = Bcfg2.Client.Frame.Frame(c1, setup, times, ['POSIX'], False)
- assert len(frame2.tools) == 1
-
- def test__init3(self):
- setup = {}
- times = {}
- frame3 = Bcfg2.Client.Frame.Frame(c2, setup, times, ['foo'], False)
- assert len(frame3.tools) == 0
-
- def test__init4(self):
- setup = {}
- times = {}
- frame = Bcfg2.Client.Frame.Frame(c2, setup, times, [DriverInitFail], False)
- assert len(frame.tools) == 0
-
- def test__Decide_Inventory(self):
- setup = {'remove': 'none',
- 'bundle': [],
- 'interactive': False}
- times = {}
- frame = Bcfg2.Client.Frame.Frame(c2, setup, times,
- [DriverInventoryFail], False)
- assert len(frame.tools) == 1
- frame.Inventory()
- assert len([x for x in list(frame.states.values()) if x]) == 0
- frame.Decide()
- assert len(frame.whitelist)
-
- def test__Decide_Bundle(self):
- setup = {'remove': 'none',
- 'bundle': ['bar'],
- 'interactive': False}
- times = {}
- frame = Bcfg2.Client.Frame.Frame(c2, setup, times,
- [DriverInventoryFail], False)
- assert len(frame.tools) == 1
- frame.Inventory()
- assert len([x for x in list(frame.states.values()) if x]) == 0
- frame.Decide()
- assert len(frame.whitelist) == 0
-
- def test__Decide_Dryrun(self):
- setup = {'remove': 'none',
- 'bundle': [],
- 'interactive': False}
- times = {}
- frame = Bcfg2.Client.Frame.Frame(c2, setup, times,
- [DriverInventoryFail], True)
- assert len(frame.tools) == 1
- frame.Inventory()
- assert len([x for x in list(frame.states.values()) if x]) == 0
- frame.Decide()
- assert len(frame.whitelist) == 0
-
- def test__GenerateStats(self):
- setup = {'remove': 'none',
- 'bundle': [],
- 'interactive': False}
- times = {}
- frame = Bcfg2.Client.Frame.Frame(c2, setup, times,
- [DriverInventoryFail], False)
- frame.Inventory()
- frame.Decide()
- stats = frame.GenerateStats()
- assert len(stats.findall('.//Bad')[0].getchildren()) != 0
diff --git a/testsuite/TestOptions.py b/testsuite/TestOptions.py
deleted file mode 100644
index 735455d45..000000000
--- a/testsuite/TestOptions.py
+++ /dev/null
@@ -1,103 +0,0 @@
-import os
-import sys
-
-import Bcfg2.Options
-
-
-class TestOption(object):
- def test__init(self):
- o = Bcfg2.Options.Option('foo', False, cmd='-F')
- try:
- p = Bcfg2.Options.Option('foo', False, cmd='--F')
- assert False
- except Bcfg2.Options.OptionFailure:
- pass
-
- def test_parse(self):
- o = Bcfg2.Options.Option('foo', 'test4', cmd='-F', env='TEST2',
- odesc='bar', cf=('communication', 'password'))
- o.parse([], ['-F', 'test'])
- assert o.value == 'test'
- o.parse([('-F', 'test2')], [])
- assert o.value == 'test2'
- os.environ['TEST2'] = 'test3'
- o.parse([], [])
- assert o.value == 'test3'
- del os.environ['TEST2']
- o.parse([], [])
- print(o.value)
- assert o.value == 'foobat'
- o.cf = ('communication', 'pwd')
- o.parse([], [])
- print(o.value)
- assert o.value == 'test4'
- o.cf = False
- o.parse([], [])
- assert o.value == 'test4'
-
- def test_cook(self):
- # check that default value isn't cooked
- o1 = Bcfg2.Options.Option('foo', 'test4', cook=Bcfg2.Options.bool_cook)
- o1.parse([], [])
- assert o1.value == 'test4'
- o2 = Bcfg2.Options.Option('foo', False, cmd='-F')
- o2.parse([('-F', '')], [])
- assert o2.value == True
-
-
-class TestOptionSet(object):
- def test_buildGetopt(self):
- opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-G')),
- ('bar', Bcfg2.Options.Option('foo', 'test2')),
- ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='-H',
- odesc='1'))]
- os = Bcfg2.Options.OptionSet(opts)
- res = os.buildGetopt()
- assert 'H:' in res and 'G' in res and len(res) == 3
-
- def test_buildLongGetopt(self):
- opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-G')),
- ('bar', Bcfg2.Options.Option('foo', 'test2')),
- ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='--H',
- odesc='1', long_arg=True))]
- os = Bcfg2.Options.OptionSet(opts)
- res = os.buildLongGetopt()
- print(res)
- assert 'H=' in res and len(res) == 1
-
- def test_parse(self):
- opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-G')),
- ('bar', Bcfg2.Options.Option('foo', 'test2')),
- ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='-H',
- odesc='1'))]
- os = Bcfg2.Options.OptionSet(opts)
- try:
- os.parse(['-G', '-H'])
- assert False
- except SystemExit:
- pass
- os2 = Bcfg2.Options.OptionSet(opts)
- try:
- os2.parse(['-h'])
- assert False
- except SystemExit:
- pass
- os3 = Bcfg2.Options.OptionSet(opts)
- os3.parse(['-G'])
- assert os3['foo'] == True
-
-
-class TestOptionParser(object):
- def test__init(self):
- opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-h')),
- ('bar', Bcfg2.Options.Option('foo', 'test2')),
- ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='-H',
- odesc='1'))]
- os1 = Bcfg2.Options.OptionParser(opts)
- assert Bcfg2.Options.Option.cfpath == '/etc/bcfg2.conf'
- sys.argv = ['foo', '-C', '/usr/local/etc/bcfg2.conf']
- os2 = Bcfg2.Options.OptionParser(opts)
- assert Bcfg2.Options.Option.cfpath == '/usr/local/etc/bcfg2.conf'
- sys.argv = []
- os3 = Bcfg2.Options.OptionParser(opts)
- assert Bcfg2.Options.Option.cfpath == '/etc/bcfg2.conf'
diff --git a/testsuite/TestPlugin.py b/testsuite/TestPlugin.py
deleted file mode 100644
index aa619249a..000000000
--- a/testsuite/TestPlugin.py
+++ /dev/null
@@ -1,122 +0,0 @@
-import gamin
-import lxml.etree
-import os
-
-import Bcfg2.Server.Core
-from Bcfg2.Server.Plugin import EntrySet
-
-
-class es_testtype(object):
- def __init__(self, name, properties, specific):
- self.name = name
- self.properties = properties
- self.specific = specific
- self.handled = 0
- self.built = 0
-
- def handle_event(self, event):
- self.handled += 1
-
- def bind_entry(self, entry, metadata):
- entry.set('bound', '1')
- entry.set('name', self.name)
- self.built += 1
-
-
-class metadata(object):
- def __init__(self, hostname):
- self.hostname = hostname
- self.groups = ['base', 'debian']
-
-#FIXME add test_specific
-
-
-class test_entry_set(object):
- def __init__(self):
- self.dirname = '/tmp/estest-%d' % os.getpid()
- os.path.isdir(self.dirname) or os.mkdir(self.dirname)
- self.metadata = metadata('testhost')
- self.es = EntrySet('template', self.dirname, None, es_testtype)
- self.e = Bcfg2.Server.Core.GaminEvent(1, 'template',
- gamin.GAMExists)
-
- def test_init(self):
- es = self.es
- e = self.e
- e.action = 'exists'
- es.handle_event(e)
- es.handle_event(e)
- assert len(es.entries) == 1
- assert list(es.entries.values())[0].handled == 2
- e.action = 'changed'
- es.handle_event(e)
- assert list(es.entries.values())[0].handled == 3
-
- def test_info(self):
- """Test info and info.xml handling."""
- es = self.es
- e = self.e
- dirname = self.dirname
- metadata = self.metadata
-
- # test 'info' handling
- assert es.metadata['group'] == 'root'
- self.mk_info(dirname)
- e.filename = 'info'
- e.action = 'exists'
- es.handle_event(e)
- assert es.metadata['group'] == 'sys'
- e.action = 'deleted'
- es.handle_event(e)
- assert es.metadata['group'] == 'root'
-
- # test 'info.xml' handling
- assert es.infoxml == None
- self.mk_info_xml(dirname)
- e.filename = 'info.xml'
- e.action = 'exists'
- es.handle_event(e)
- assert es.infoxml
- e.action = 'deleted'
- es.handle_event(e)
- assert es.infoxml == None
-
- def test_file_building(self):
- """Test file building."""
- self.test_init()
- ent = lxml.etree.Element('foo')
- self.es.bind_entry(ent, self.metadata)
- print(list(self.es.entries.values())[0])
- assert list(self.es.entries.values())[0].built == 1
-
- def test_host_specific_file_building(self):
- """Add a host-specific template and build it."""
- self.e.filename = 'template.H_%s' % self.metadata.hostname
- self.e.action = 'exists'
- self.es.handle_event(self.e)
- assert len(self.es.entries) == 1
- ent = lxml.etree.Element('foo')
- self.es.bind_entry(ent, self.metadata)
- # FIXME need to test that it built the _right_ file here
-
- def test_deletion(self):
- """Test deletion of files."""
- self.test_init()
- self.e.filename = 'template'
- self.e.action = 'deleted'
- self.es.handle_event(self.e)
- assert len(self.es.entries) == 0
-
- # TODO - how to clean up the temp dir & files after tests done?
-
- def mk_info(self, dir):
- i = open("%s/info" % dir, 'w')
- i.write('owner: root\n')
- i.write('group: sys\n')
- i.write('perms: 0600\n')
- i.close
-
- def mk_info_xml(self, dir):
- i = open("%s/info.xml" % dir, 'w')
- i.write('<FileInfo><Info owner="root" group="other" perms="0600" /></FileInfo>\n')
- i.close
diff --git a/testsuite/Testlib/TestOptions.py b/testsuite/Testlib/TestOptions.py
new file mode 100644
index 000000000..f5833a54a
--- /dev/null
+++ b/testsuite/Testlib/TestOptions.py
@@ -0,0 +1,124 @@
+import os
+import sys
+import unittest
+from mock import Mock, patch
+import Bcfg2.Options
+
+class TestOption(unittest.TestCase):
+ def test__init(self):
+ self.assertRaises(Bcfg2.Options.OptionFailure,
+ Bcfg2.Options.Option,
+ 'foo', False, cmd='f')
+ self.assertRaises(Bcfg2.Options.OptionFailure,
+ Bcfg2.Options.Option,
+ 'foo', False, cmd='--f')
+ self.assertRaises(Bcfg2.Options.OptionFailure,
+ Bcfg2.Options.Option,
+ 'foo', False, cmd='-foo')
+ self.assertRaises(Bcfg2.Options.OptionFailure,
+ Bcfg2.Options.Option,
+ 'foo', False, cmd='-foo', long_arg=True)
+
+ @patch('ConfigParser.ConfigParser')
+ @patch('__builtin__.open')
+ def test_getCFP(self, mock_open, mock_cp):
+ mock_cp.return_value = Mock()
+ o = Bcfg2.Options.Option('foo', False, cmd='-f')
+ self.assertFalse(o._Option__cfp)
+ o.getCFP()
+ mock_cp.assert_any_call()
+ mock_open.assert_any_call(Bcfg2.Options.DEFAULT_CONFIG_LOCATION)
+ self.assertTrue(mock_cp.return_value.readfp.called)
+
+ @patch('Bcfg2.Options.Option.cfp')
+ def test_parse(self, mock_cfp):
+ cf = ('communication', 'password')
+ o = Bcfg2.Options.Option('foo', 'test4', cmd='-F', env='TEST2',
+ odesc='bar', cf=cf)
+ o.parse([], ['-F', 'test'])
+ self.assertEqual(o.value, 'test')
+ o.parse([('-F', 'test2')], [])
+ self.assertEqual(o.value, 'test2')
+
+ os.environ['TEST2'] = 'test3'
+ o.parse([], [])
+ self.assertEqual(o.value, 'test3')
+ del os.environ['TEST2']
+
+ mock_cfp.get = Mock()
+ mock_cfp.get.return_value = 'test5'
+ o.parse([], [])
+ self.assertEqual(o.value, 'test5')
+ mock_cfp.get.assert_any_call(*cf)
+
+ o.cf = False
+ o.parse([], [])
+ assert o.value == 'test4'
+
+ def test_cook(self):
+ # check that default value isn't cooked
+ o1 = Bcfg2.Options.Option('foo', 'test4', cook=Bcfg2.Options.bool_cook)
+ o1.parse([], [])
+ assert o1.value == 'test4'
+ o2 = Bcfg2.Options.Option('foo', False, cmd='-F')
+ o2.parse([('-F', '')], [])
+ assert o2.value == True
+
+
+class TestOptionSet(unittest.TestCase):
+ def test_buildGetopt(self):
+ opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-G')),
+ ('bar', Bcfg2.Options.Option('foo', 'test2')),
+ ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='-H',
+ odesc='1'))]
+ oset = Bcfg2.Options.OptionSet(opts)
+ res = oset.buildGetopt()
+ self.assertIn('H:', res)
+ self.assertIn('G', res)
+ self.assertEqual(len(res), 3)
+
+ def test_buildLongGetopt(self):
+ opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-G')),
+ ('bar', Bcfg2.Options.Option('foo', 'test2')),
+ ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='--H',
+ odesc='1', long_arg=True))]
+ oset = Bcfg2.Options.OptionSet(opts)
+ res = oset.buildLongGetopt()
+ self.assertIn('H=', res)
+ self.assertEqual(len(res), 1)
+
+ def test_parse(self):
+ opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-G')),
+ ('bar', Bcfg2.Options.Option('foo', 'test2')),
+ ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='-H',
+ odesc='1'))]
+ oset = Bcfg2.Options.OptionSet(opts)
+ self.assertRaises(SystemExit,
+ oset.parse,
+ ['-G', '-H'])
+ oset2 = Bcfg2.Options.OptionSet(opts)
+ self.assertRaises(SystemExit,
+ oset2.parse,
+ ['-h'])
+ oset3 = Bcfg2.Options.OptionSet(opts)
+ oset3.parse(['-G'])
+ self.assertTrue(oset3['foo'])
+
+
+class TestOptionParser(unittest.TestCase):
+ def test__init(self):
+ opts = [('foo', Bcfg2.Options.Option('foo', 'test1', cmd='-h')),
+ ('bar', Bcfg2.Options.Option('foo', 'test2')),
+ ('baz', Bcfg2.Options.Option('foo', 'test1', cmd='-H',
+ odesc='1'))]
+ oset1 = Bcfg2.Options.OptionParser(opts)
+ self.assertEqual(Bcfg2.Options.Option.cfpath,
+ Bcfg2.Options.DEFAULT_CONFIG_LOCATION)
+ sys.argv = ['foo', '-C', '/usr/local/etc/bcfg2.conf']
+ oset2 = Bcfg2.Options.OptionParser(opts)
+ self.assertEqual(Bcfg2.Options.Option.cfpath,
+ '/usr/local/etc/bcfg2.conf')
+ sys.argv = []
+ oset3 = Bcfg2.Options.OptionParser(opts)
+ self.assertEqual(Bcfg2.Options.Option.cfpath,
+ Bcfg2.Options.DEFAULT_CONFIG_LOCATION)
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py
new file mode 100644
index 000000000..4d0b96ee7
--- /dev/null
+++ b/testsuite/Testlib/TestServer/TestPlugins/TestMetadata.py
@@ -0,0 +1,898 @@
+import os
+import copy
+import time
+import socket
+import unittest
+import lxml.etree
+from mock import Mock, patch
+import Bcfg2.Server.Plugin
+from Bcfg2.Server.Plugins.Metadata import *
+
+XI_NAMESPACE = "http://www.w3.org/2001/XInclude"
+XI = "{%s}" % XI_NAMESPACE
+
+clients_test_tree = lxml.etree.XML('''
+<Clients>
+ <Client name="client1" address="1.2.3.1" auth="cert"
+ location="floating" password="password2" profile="group1"/>
+ <Client name="client2" address="1.2.3.2" secure="true" profile="group2"/>
+ <Client name="client3" address="1.2.3.3" uuid="uuid1" profile="group1"
+ password="password2">
+ <Alias name="alias1"/>
+ </Client>
+ <Client name="client4" profile="group1">
+ <Alias name="alias2" address="1.2.3.2"/>
+ <Alias name="alias3"/>
+ </Client>
+ <Client name="client5" profile="group1"/>
+ <Client name="client6" profile="group1" auth="bootstrap"/>
+ <Client name="client7" profile="group1" auth="cert" address="1.2.3.4"/>
+ <Client name="client8" profile="group1" auth="cert+password"
+ address="1.2.3.5"/>
+ <Client name="client9" profile="group2" secure="true" password="password3"/>
+</Clients>''').getroottree()
+
+groups_test_tree = lxml.etree.XML('''
+<Groups xmlns:xi="http://www.w3.org/2001/XInclude">
+ <Group name="group1" default="true" profile="true" public="true"
+ category="category1"/>
+ <Group name="group2" profile="true" public="true" category="category1">
+ <Bundle name="bundle1"/>
+ <Bundle name="bundle2"/>
+ <Group name="group1"/>
+ <Group name="group4"/>
+ </Group>
+ <Group name="group3" category="category2" public="false"/>
+ <Group name="group4" category="category1">
+ <Group name="group1"/>
+ <Group name="group6"/>
+ </Group>
+ <Group name="group5"/>
+ <Group name="group7">
+ <Bundle name="bundle3"/>
+ </Group>
+ <Group name="group8">
+ <Group name="group9"/>
+ </Group>
+</Groups>''').getroottree()
+
+datastore = "/"
+
+def get_metadata_object(core=None, watch_clients=False):
+ if core is None:
+ core = Mock()
+ metadata = Metadata(core, datastore, watch_clients=watch_clients)
+ #metadata.logger = Mock()
+ return metadata
+
+
+class TestXMLMetadataConfig(unittest.TestCase):
+ def get_config_object(self, basefile=None, core=None, watch_clients=False):
+ self.metadata = get_metadata_object(core=core,
+ watch_clients=watch_clients)
+ return XMLMetadataConfig(self.metadata, watch_clients, basefile)
+
+ def test_xdata(self):
+ config = self.get_config_object()
+ # we can't use assertRaises here because xdata is a property
+ try:
+ config.xdata
+ except MetadataRuntimeError:
+ pass
+ except:
+ assert False
+ config.data = "<test/>"
+ self.assertEqual(config.xdata, "<test/>")
+
+ def test_base_xdata(self):
+ config = self.get_config_object()
+ # we can't use assertRaises here because base_xdata is a property
+ try:
+ config.base_xdata
+ except MetadataRuntimeError:
+ pass
+ except:
+ assert False
+ config.basedata = "<test/>"
+ self.assertEqual(config.base_xdata, "<test/>")
+
+ def test_add_monitor(self):
+ core = Mock()
+ config = self.get_config_object(core=core)
+ config.extras = []
+ config.add_monitor("test.xml")
+ self.assertFalse(core.fam.AddMonitor.called)
+ self.assertEqual(config.extras, [])
+
+ config = self.get_config_object(core=core, watch_clients=True)
+ config.add_monitor("test.xml")
+ core.fam.AddMonitor.assert_called_with(os.path.join(self.metadata.data,
+ "test.xml"),
+ self.metadata)
+ self.assertItemsEqual(config.extras, ["test.xml"])
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.add_monitor")
+ @patch("lxml.etree.parse")
+ def test_load_xml(self, mock_parse, mock_add_monitor):
+ config = self.get_config_object("clients.xml")
+ mock_parse.side_effect = lxml.etree.XMLSyntaxError(None, None, None,
+ None)
+ config.load_xml()
+ self.assertIsNone(config.data)
+ self.assertIsNone(config.basedata)
+
+ config.data = None
+ config.basedata = None
+ mock_parse.side_effect = None
+ mock_parse.return_value.findall = Mock(return_value=[])
+ config.load_xml()
+ self.assertIsNotNone(config.data)
+ self.assertIsNotNone(config.basedata)
+
+ config.data = None
+ config.basedata = None
+ mock_parse.return_value.findall = \
+ Mock(return_value=[lxml.etree.Element(XI + "include",
+ href="more.xml"),
+ lxml.etree.Element(XI + "include",
+ href="evenmore.xml")])
+ config.load_xml()
+ mock_add_monitor.assert_any_call("more.xml")
+ mock_add_monitor.assert_any_call("evenmore.xml")
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml")
+ def test_write(self, mock_write_xml):
+ config = self.get_config_object("clients.xml")
+ config.basedata = "<test/>"
+ config.write()
+ mock_write_xml.assert_called_with(os.path.join(self.metadata.data,
+ "clients.xml"),
+ "<test/>")
+
+ @patch('Bcfg2.Server.Plugins.Metadata.locked', Mock(return_value=False))
+ @patch('fcntl.lockf', Mock())
+ @patch('__builtin__.open')
+ @patch('os.unlink')
+ @patch('os.rename')
+ @patch('os.path.islink')
+ @patch('os.readlink')
+ def test_write_xml(self, mock_readlink, mock_islink, mock_rename,
+ mock_unlink, mock_open):
+ fname = "clients.xml"
+ config = self.get_config_object(fname)
+ fpath = os.path.join(self.metadata.data, fname)
+ tmpfile = "%s.new" % fpath
+ linkdest = os.path.join(self.metadata.data, "client-link.xml")
+
+ mock_islink.return_value = False
+
+ config.write_xml(fpath, clients_test_tree)
+ mock_open.assert_called_with(tmpfile, "w")
+ self.assertTrue(mock_open.return_value.write.called)
+ mock_islink.assert_called_with(fpath)
+ mock_rename.assert_called_with(tmpfile, fpath)
+
+ mock_islink.return_value = True
+ mock_readlink.return_value = linkdest
+ config.write_xml(fpath, clients_test_tree)
+ mock_rename.assert_called_with(tmpfile, linkdest)
+
+ mock_rename.side_effect = OSError
+ self.assertRaises(MetadataRuntimeError,
+ config.write_xml, fpath, clients_test_tree)
+
+ mock_open.return_value.write.side_effect = IOError
+ self.assertRaises(MetadataRuntimeError,
+ config.write_xml, fpath, clients_test_tree)
+ mock_unlink.assert_called_with(tmpfile)
+
+ mock_open.side_effect = IOError
+ self.assertRaises(MetadataRuntimeError,
+ config.write_xml, fpath, clients_test_tree)
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch('lxml.etree.parse')
+ def test_find_xml_for_xpath(self, mock_parse):
+ config = self.get_config_object("groups.xml")
+ config.basedata = groups_test_tree
+ xpath = "//Group[@name='group1']"
+ self.assertItemsEqual(config.find_xml_for_xpath(xpath),
+ dict(filename=os.path.join(self.metadata.data,
+ "groups.xml"),
+ xmltree=groups_test_tree,
+ xquery=groups_test_tree.xpath(xpath)))
+
+ self.assertEqual(config.find_xml_for_xpath("//boguselement"), dict())
+
+ config.extras = ["foo.xml", "bar.xml", "clients.xml"]
+
+ def parse_side_effect(fname):
+ if fname == os.path.join(self.metadata.data, "clients.xml"):
+ return clients_test_tree
+ else:
+ return lxml.etree.XML("<null/>").getroottree()
+
+ mock_parse.side_effect = parse_side_effect
+ xpath = "//Client[@secure='true']"
+ self.assertItemsEqual(config.find_xml_for_xpath(xpath),
+ dict(filename=os.path.join(self.metadata.data,
+ "clients.xml"),
+ xmltree=clients_test_tree,
+ xquery=clients_test_tree.xpath(xpath)))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
+ def test_HandleEvent(self, mock_load_xml):
+ config = self.get_config_object("groups.xml")
+ evt = Mock()
+ evt.filename = os.path.join(self.metadata.data, "groups.xml")
+ evt.code2str = Mock(return_value="changed")
+ self.assertTrue(config.HandleEvent(evt))
+ mock_load_xml.assert_called_with()
+
+
+class TestClientMetadata(unittest.TestCase):
+ def test_inGroup(self):
+ cm = ClientMetadata("client1", "group1", ["group1", "group2"],
+ ["bundle1"], [], [], [], None, None, None)
+ self.assertTrue(cm.inGroup("group1"))
+ self.assertFalse(cm.inGroup("group3"))
+
+
+class TestMetadata(unittest.TestCase):
+ def test__init_no_fam(self):
+ # test with watch_clients=False
+ core = Mock()
+ metadata = get_metadata_object(core=core)
+ self.check_metadata_object(metadata)
+ self.assertEqual(metadata.states, dict())
+
+ def test__init_with_fam(self):
+ # test with watch_clients=True
+ core = Mock()
+ core.fam = Mock()
+ metadata = get_metadata_object(core=core, watch_clients=True)
+ self.assertEqual(len(metadata.states), 2)
+ core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
+ "groups.xml"),
+ metadata)
+ core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
+ "clients.xml"),
+ metadata)
+
+ core.fam.reset_mock()
+ core.fam.AddMonitor = Mock(side_effect=IOError)
+ self.assertRaises(Bcfg2.Server.Plugin.PluginInitError,
+ get_metadata_object,
+ core=core, watch_clients=True)
+
+ def check_metadata_object(self, metadata):
+ self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin)
+ self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata)
+ self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Statistics)
+ self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig)
+ self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig)
+ self.assertIsInstance(metadata.query, MetadataQuery)
+
+ @patch('os.makedirs', Mock())
+ @patch('__builtin__.open')
+ def test_init_repo(self, mock_open):
+ groups = "groups %s"
+ os_selection = "os"
+ clients = "clients %s"
+ Metadata.init_repo(datastore, groups, os_selection, clients)
+ mock_open.assert_any_call(os.path.join(datastore, "Metadata",
+ "groups.xml"), "w")
+ mock_open.assert_any_call(os.path.join(datastore, "Metadata",
+ "clients.xml"), "w")
+
+ @patch('lxml.etree.parse')
+ def test_get_groups(self, mock_parse):
+ metadata = get_metadata_object()
+ mock_parse.return_value = groups_test_tree
+ groups = metadata.get_groups()
+ mock_parse.assert_called_with(os.path.join(datastore, "Metadata",
+ "groups.xml"))
+ self.assertIsInstance(groups, lxml.etree._Element)
+
+ def test_search_xdata_name(self):
+ # test finding a node with the proper name
+ metadata = get_metadata_object()
+ tree = groups_test_tree
+ res = metadata._search_xdata("Group", "group1", tree)
+ self.assertIsInstance(res, lxml.etree._Element)
+ self.assertEqual(res.get("name"), "group1")
+
+ def test_search_xdata_alias(self):
+ # test finding a node with the wrong name but correct alias
+ metadata = get_metadata_object()
+ tree = clients_test_tree
+ res = metadata._search_xdata("Client", "alias3", tree, alias=True)
+ self.assertIsInstance(res, lxml.etree._Element)
+ self.assertNotEqual(res.get("name"), "alias3")
+
+ def test_search_xdata_not_found(self):
+ # test failure finding a node
+ metadata = get_metadata_object()
+ tree = clients_test_tree
+ res = metadata._search_xdata("Client", "bogus_client", tree, alias=True)
+ self.assertIsNone(res)
+
+ def search_xdata(self, tag, name, tree, alias=False):
+ metadata = get_metadata_object()
+ res = metadata._search_xdata(tag, name, tree, alias=alias)
+ self.assertIsInstance(res, lxml.etree._Element)
+ if not alias:
+ self.assertEqual(res.get("name"), name)
+
+ def test_search_group(self):
+ # test finding a group with the proper name
+ tree = groups_test_tree
+ self.search_xdata("Group", "group1", tree)
+
+ def test_search_bundle(self):
+ # test finding a bundle with the proper name
+ tree = groups_test_tree
+ self.search_xdata("Bundle", "bundle1", tree)
+
+ def test_search_client(self):
+ # test finding a client with the proper name
+ tree = clients_test_tree
+ self.search_xdata("Client", "client1", tree, alias=True)
+ self.search_xdata("Client", "alias1", tree, alias=True)
+
+ def test_add_group(self):
+ metadata = get_metadata_object()
+ metadata.groups_xml.write = Mock()
+ metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.add_group("test1", dict())
+ metadata.groups_xml.write.assert_any_call()
+ grp = metadata.search_group("test1", metadata.groups_xml.base_xdata)
+ self.assertIsNotNone(grp)
+ self.assertEqual(grp.attrib, dict(name='test1'))
+
+ # have to call this explicitly -- usually load_xml does this
+ # on FAM events
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.add_group("test2", dict(foo='bar'))
+ metadata.groups_xml.write.assert_any_call()
+ grp = metadata.search_group("test2", metadata.groups_xml.base_xdata)
+ self.assertIsNotNone(grp)
+ self.assertEqual(grp.attrib, dict(name='test2', foo='bar'))
+
+ # have to call this explicitly -- usually load_xml does this
+ # on FAM events
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.groups_xml.write.reset_mock()
+ self.assertRaises(MetadataConsistencyError,
+ metadata.add_group,
+ "test1", dict())
+ self.assertFalse(metadata.groups_xml.write.called)
+
+ def test_update_group(self):
+ metadata = get_metadata_object()
+ metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.data = copy.deepcopy(groups_test_tree)
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.update_group("group1", dict(foo="bar"))
+ grp = metadata.search_group("group1", metadata.groups_xml.base_xdata)
+ self.assertIsNotNone(grp)
+ self.assertIn("foo", grp.attrib)
+ self.assertEqual(grp.get("foo"), "bar")
+ self.assertTrue(metadata.groups_xml.write_xml.called)
+
+ self.assertRaises(MetadataConsistencyError,
+ metadata.update_group,
+ "bogus_group", dict())
+
+ def test_remove_group(self):
+ metadata = get_metadata_object()
+ metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.data = copy.deepcopy(groups_test_tree)
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.remove_group("group5")
+ grp = metadata.search_group("group5", metadata.groups_xml.base_xdata)
+ self.assertIsNone(grp)
+ self.assertTrue(metadata.groups_xml.write_xml.called)
+
+ self.assertRaises(MetadataConsistencyError,
+ metadata.remove_group,
+ "bogus_group")
+
+ def test_add_bundle(self):
+ metadata = get_metadata_object()
+ metadata.groups_xml.write = Mock()
+ metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.add_bundle("bundle1")
+ metadata.groups_xml.write.assert_any_call()
+ bundle = metadata.search_bundle("bundle1",
+ metadata.groups_xml.base_xdata)
+ self.assertIsNotNone(bundle)
+ self.assertEqual(bundle.attrib, dict(name='bundle1'))
+
+ # have to call this explicitly -- usually load_xml does this
+ # on FAM events
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.groups_xml.write.reset_mock()
+ self.assertRaises(MetadataConsistencyError,
+ metadata.add_bundle,
+ "bundle1")
+ self.assertFalse(metadata.groups_xml.write.called)
+
+ def test_remove_bundle(self):
+ metadata = get_metadata_object()
+ metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.data = copy.deepcopy(groups_test_tree)
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+
+ metadata.remove_bundle("bundle1")
+ grp = metadata.search_bundle("bundle1", metadata.groups_xml.base_xdata)
+ self.assertIsNone(grp)
+ self.assertTrue(metadata.groups_xml.write_xml.called)
+
+ self.assertRaises(MetadataConsistencyError,
+ metadata.remove_bundle,
+ "bogus_bundle")
+
+ def test_add_client(self):
+ metadata = get_metadata_object()
+ metadata.clients_xml.write = Mock()
+ metadata.clients_xml.data = lxml.etree.XML('<Clients/>').getroottree()
+ metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
+
+ metadata.add_client("test1", dict())
+ metadata.clients_xml.write.assert_any_call()
+ grp = metadata.search_client("test1", metadata.clients_xml.base_xdata)
+ self.assertIsNotNone(grp)
+ self.assertEqual(grp.attrib, dict(name='test1'))
+
+ # have to call this explicitly -- usually load_xml does this
+ # on FAM events
+ metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
+
+ metadata.add_client("test2", dict(foo='bar'))
+ metadata.clients_xml.write.assert_any_call()
+ grp = metadata.search_client("test2", metadata.clients_xml.base_xdata)
+ self.assertIsNotNone(grp)
+ self.assertEqual(grp.attrib, dict(name='test2', foo='bar'))
+
+ # have to call this explicitly -- usually load_xml does this
+ # on FAM events
+ metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
+
+ metadata.clients_xml.write.reset_mock()
+ self.assertRaises(MetadataConsistencyError,
+ metadata.add_client,
+ "test1", dict())
+ self.assertFalse(metadata.clients_xml.write.called)
+
+ def test_update_client(self):
+ metadata = get_metadata_object()
+ metadata.clients_xml.write_xml = Mock()
+ metadata.clients_xml.data = copy.deepcopy(clients_test_tree)
+ metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
+
+ metadata.update_client("client1", dict(foo="bar"))
+ grp = metadata.search_client("client1", metadata.clients_xml.base_xdata)
+ self.assertIsNotNone(grp)
+ self.assertIn("foo", grp.attrib)
+ self.assertEqual(grp.get("foo"), "bar")
+ self.assertTrue(metadata.clients_xml.write_xml.called)
+
+ self.assertRaises(MetadataConsistencyError,
+ metadata.update_client,
+ "bogus_client", dict())
+
+ def load_clients_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = get_metadata_object()
+ metadata.clients_xml.data = xdata or copy.deepcopy(clients_test_tree)
+ metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
+ evt = Mock()
+ evt.filename = os.path.join(datastore, "Metadata", "clients.xml")
+ evt.code2str = Mock(return_value="changed")
+ metadata.HandleEvent(evt)
+ return metadata
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
+ def test_clients_xml_event(self, mock_load_xml):
+ metadata = get_metadata_object()
+ metadata.profiles = ["group1", "group2"]
+ self.load_clients_data(metadata=metadata)
+ mock_load_xml.assert_any_call()
+ self.assertItemsEqual(metadata.clients,
+ dict([(c.get("name"), c.get("profile"))
+ for c in clients_test_tree.findall("//Client")]))
+ aliases = dict([(a.get("name"), a.getparent().get("name"))
+ for a in clients_test_tree.findall("//Alias")])
+ self.assertItemsEqual(metadata.aliases, aliases)
+
+ raliases = dict([(c.get("name"), set())
+ for c in clients_test_tree.findall("//Client")])
+ for alias in clients_test_tree.findall("//Alias"):
+ raliases[alias.getparent().get("name")].add(alias.get("name"))
+ self.assertItemsEqual(metadata.raliases, raliases)
+
+ self.assertEqual(metadata.bad_clients, dict())
+ self.assertEqual(metadata.secure,
+ [c.get("name")
+ for c in clients_test_tree.findall("//Client[@secure='true']")])
+ self.assertEqual(metadata.floating, ["client1"])
+
+ addresses = dict([(c.get("address"), [])
+ for c in clients_test_tree.findall("//*[@address]")])
+ raddresses = dict()
+ for client in clients_test_tree.findall("//Client[@address]"):
+ addresses[client.get("address")].append(client.get("name"))
+ try:
+ raddresses[client.get("name")].append(client.get("address"))
+ except KeyError:
+ raddresses[client.get("name")] = [client.get("address")]
+ for alias in clients_test_tree.findall("//Alias[@address]"):
+ addresses[alias.get("address")].append(alias.getparent().get("name"))
+ try:
+ raddresses[alias.getparent().get("name")].append(alias.get("address"))
+ except KeyError:
+ raddresses[alias.getparent().get("name")] = alias.get("address")
+
+ self.assertItemsEqual(metadata.addresses, addresses)
+ self.assertItemsEqual(metadata.raddresses, raddresses)
+ self.assertTrue(metadata.states['clients.xml'])
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_clients_xml_event_bad_clients(self):
+ metadata = get_metadata_object()
+ metadata.profiles = ["group2"]
+ self.load_clients_data(metadata=metadata)
+ clients = dict()
+ badclients = dict()
+ for client in clients_test_tree.findall("//Client"):
+ if client.get("profile") in metadata.profiles:
+ clients[client.get("name")] = client.get("profile")
+ else:
+ badclients[client.get("name")] = client.get("profile")
+ self.assertItemsEqual(metadata.clients, clients)
+ self.assertItemsEqual(metadata.bad_clients, badclients)
+
+ def load_groups_data(self, metadata=None, xdata=None):
+ if metadata is None:
+ metadata = get_metadata_object()
+ metadata.groups_xml.data = xdata or copy.deepcopy(groups_test_tree)
+ metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
+ evt = Mock()
+ evt.filename = os.path.join(datastore, "Metadata", "groups.xml")
+ evt.code2str = Mock(return_value="changed")
+ metadata.HandleEvent(evt)
+ return metadata
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml")
+ def test_groups_xml_event(self, mock_load_xml):
+ dup_data = copy.deepcopy(groups_test_tree)
+ lxml.etree.SubElement(dup_data.getroot(),
+ "Group", name="group1")
+ metadata = self.load_groups_data(xdata=dup_data)
+ mock_load_xml.assert_any_call()
+ self.assertEqual(metadata.public, ["group1", "group2"])
+ self.assertEqual(metadata.private, ["group3"])
+ self.assertEqual(metadata.profiles, ["group1", "group2"])
+ self.assertItemsEqual(metadata.groups.keys(),
+ [g.get("name")
+ for g in groups_test_tree.findall("/Group")])
+ self.assertEqual(metadata.categories,
+ dict(group1="category1",
+ group2="category1",
+ group3="category2",
+ group4="category1"))
+ self.assertEqual(metadata.default, "group1")
+ self.assertTrue(metadata.states['groups.xml'])
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.add_client")
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client")
+ def test_set_profile(self, mock_update_client, mock_add_client):
+ metadata = get_metadata_object()
+ metadata.states['clients.xml'] = False
+ self.assertRaises(MetadataRuntimeError,
+ metadata.set_profile,
+ None, None, None)
+
+ self.load_groups_data(metadata=metadata)
+ self.load_clients_data(metadata=metadata)
+
+ self.assertRaises(MetadataConsistencyError,
+ metadata.set_profile,
+ "client1", "group5", None)
+
+ metadata.clients_xml.write = Mock()
+ metadata.set_profile("client1", "group2", None)
+ mock_update_client.assert_called_with("client1", dict(profile="group2"))
+ metadata.clients_xml.write.assert_any_call()
+ self.assertEqual(metadata.clients["client1"], "group2")
+
+ metadata.clients_xml.write.reset_mock()
+ metadata.set_profile("client_new", "group1", None)
+ mock_add_client.assert_called_with("client_new", dict(profile="group1"))
+ metadata.clients_xml.write.assert_any_call()
+ self.assertEqual(metadata.clients["client_new"], "group1")
+
+ metadata.session_cache[('1.2.3.6', None)] = (None, 'client_new2')
+ metadata.clients_xml.write.reset_mock()
+ metadata.set_profile("uuid_new", "group1", ('1.2.3.6', None))
+ mock_add_client.assert_called_with("client_new2",
+ dict(uuid='uuid_new',
+ profile="group1",
+ address='1.2.3.6'))
+ metadata.clients_xml.write.assert_any_call()
+ self.assertEqual(metadata.clients["uuid_new"], "group1")
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("socket.gethostbyaddr")
+ def test_resolve_client(self, mock_gethostbyaddr):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ metadata.session_cache[('1.2.3.3', None)] = (time.time(), 'client3')
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
+
+ self.assertRaises(MetadataConsistencyError,
+ metadata.resolve_client,
+ ('1.2.3.2', None))
+ self.assertEqual(metadata.resolve_client(('1.2.3.1', None)), 'client1')
+
+ metadata.session_cache[('1.2.3.3', None)] = (time.time() - 100,
+ 'client3')
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None)), 'client3')
+ self.assertEqual(metadata.resolve_client(('1.2.3.3', None),
+ cleanup_cache=True), 'client3')
+ self.assertEqual(metadata.session_cache, dict())
+
+ mock_gethostbyaddr.return_value = ('client6', [], ['1.2.3.6'])
+ self.assertEqual(metadata.resolve_client(('1.2.3.6', None)), 'client6')
+ mock_gethostbyaddr.assert_called_with('1.2.3.6')
+
+ mock_gethostbyaddr.reset_mock()
+ mock_gethostbyaddr.return_value = ('alias3', [], ['1.2.3.7'])
+ self.assertEqual(metadata.resolve_client(('1.2.3.7', None)), 'client4')
+ mock_gethostbyaddr.assert_called_with('1.2.3.7')
+
+ mock_gethostbyaddr.reset_mock()
+ mock_gethostbyaddr.return_value = None
+ mock_gethostbyaddr.side_effect = socket.herror
+ self.assertRaises(MetadataConsistencyError,
+ metadata.resolve_client,
+ ('1.2.3.8', None))
+ mock_gethostbyaddr.assert_called_with('1.2.3.8')
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.write_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.ClientMetadata")
+ def test_get_initial_metadata(self, mock_clientmetadata):
+ metadata = get_metadata_object()
+ metadata.states['clients.xml'] = False
+ self.assertRaises(MetadataRuntimeError,
+ metadata.get_initial_metadata, None)
+
+ self.load_groups_data(metadata=metadata)
+ self.load_clients_data(metadata=metadata)
+
+ metadata.get_initial_metadata("client1")
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ ("client1", "group1", set(["group1"]), set(), set(),
+ set(["1.2.3.1"]), dict(), None, 'password2'))
+
+ metadata.get_initial_metadata("client2")
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ ("client2", "group2", set(["group1", "group2"]),
+ set(["bundle1", "bundle2"]), set(),
+ set(["1.2.3.2"]), dict(category1="group1"),
+ None, None))
+
+ imd = metadata.get_initial_metadata("alias1")
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ ("client3", "group1", set(["group1"]), set(),
+ set(['alias1']), set(["1.2.3.3"]), dict(), 'uuid1',
+ 'password2'))
+
+ imd = metadata.get_initial_metadata("client_new")
+ self.assertEqual(mock_clientmetadata.call_args[0][:9],
+ ("client_new", "group1", set(["group1"]), set(),
+ set(), set(), dict(), None, None))
+
+ metadata.default = None
+ self.assertRaises(MetadataConsistencyError,
+ metadata.get_initial_metadata,
+ "client_new2")
+
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_get_all_group_names(self):
+ metadata = self.load_groups_data()
+ self.assertItemsEqual(metadata.get_all_group_names(),
+ set([g.get("name")
+ for g in groups_test_tree.findall("//Group")]))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_get_all_groups_in_category(self):
+ metadata = self.load_groups_data()
+ self.assertItemsEqual(metadata.get_all_groups_in_category("category1"),
+ set([g.get("name")
+ for g in groups_test_tree.findall("//Group[@category='category1']")]))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_get_client_names_by_profiles(self):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ self.assertItemsEqual(metadata.get_client_names_by_profiles("group2"),
+ [c.get("name")
+ for c in clients_test_tree.findall("//Client[@profile='group2']")])
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_get_client_names_by_groups(self):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ # this is not the best test in the world, since we mock
+ # core.build_metadata to just build _initial_ metadata, which
+ # is not at all the same thing. it turns out that mocking
+ # this out without starting a Bcfg2 server is pretty
+ # non-trivial, so this works-ish
+ metadata.core.build_metadata = Mock()
+ metadata.core.build_metadata.side_effect = \
+ lambda c: metadata.get_initial_metadata(c)
+ self.assertItemsEqual(metadata.get_client_names_by_groups(["group2"]),
+ [c.get("name")
+ for c in clients_test_tree.findall("//Client[@profile='group2']")])
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_merge_additional_groups(self):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ imd = metadata.get_initial_metadata("client2")
+
+ # test adding a group excluded by categories
+ oldgroups = imd.groups
+ metadata.merge_additional_groups(imd, ["group4"])
+ self.assertEqual(imd.groups, oldgroups)
+
+ # test adding a private group
+ oldgroups = imd.groups
+ metadata.merge_additional_groups(imd, ["group3"])
+ self.assertEqual(imd.groups, oldgroups)
+
+ # test adding groups with bundles
+ oldgroups = imd.groups
+ oldbundles = imd.bundles
+ metadata.merge_additional_groups(imd, ["group7"])
+ self.assertEqual(imd.groups, oldgroups.union(["group7"]))
+ self.assertEqual(imd.bundles, oldbundles.union(["bundle3"]))
+
+ # test adding multiple groups
+ imd = metadata.get_initial_metadata("client2")
+ oldgroups = imd.groups
+ metadata.merge_additional_groups(imd, ["group6", "group8"])
+ self.assertItemsEqual(imd.groups,
+ oldgroups.union(["group6", "group8", "group9"]))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ def test_merge_additional_data(self):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ imd = metadata.get_initial_metadata("client1")
+
+ # we need to use a unique attribute name for this test. this
+ # is probably overkill, but it works
+ pattern = "connector%d"
+ for i in range(0, 100):
+ connector = pattern % i
+ if not hasattr(imd, connector):
+ break
+ self.assertFalse(hasattr(imd, connector),
+ "Could not find unique connector name to test "
+ "merge_additional_data()")
+
+ metadata.merge_additional_data(imd, connector, "test data")
+ self.assertEqual(getattr(imd, connector), "test data")
+ self.assertIn(connector, imd.connectors)
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
+ def test_validate_client_address(self, mock_resolve_client):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ self.assertTrue(metadata.validate_client_address("client1",
+ (None, None)))
+ self.assertTrue(metadata.validate_client_address("client2",
+ ("1.2.3.2", None)))
+ self.assertFalse(metadata.validate_client_address("client2",
+ ("1.2.3.8", None)))
+ self.assertTrue(metadata.validate_client_address("client4",
+ ("1.2.3.2", None)))
+ # this is upper case to ensure that case is folded properly in
+ # validate_client_address()
+ mock_resolve_client.return_value = "CLIENT4"
+ self.assertTrue(metadata.validate_client_address("client4",
+ ("1.2.3.7", None)))
+ mock_resolve_client.assert_called_with(("1.2.3.7", None))
+
+ mock_resolve_client.reset_mock()
+ self.assertFalse(metadata.validate_client_address("client5",
+ ("1.2.3.5", None)))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.validate_client_address")
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.resolve_client")
+ def test_AuthenticateConnection(self, mock_resolve_client,
+ mock_validate_client_address):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ metadata.password = "password1"
+
+ cert = dict(subject=[[("commonName", "client1")]])
+ mock_validate_client_address.return_value = False
+ self.assertFalse(metadata.AuthenticateConnection(cert, "root", None,
+ "1.2.3.1"))
+ mock_validate_client_address.return_value = True
+ self.assertTrue(metadata.AuthenticateConnection(cert, "root", None,
+ "1.2.3.1"))
+ # floating cert-auth clients add themselves to the cache
+ self.assertIn("1.2.3.1", metadata.session_cache)
+ self.assertEqual(metadata.session_cache["1.2.3.1"][1], "client1")
+
+ cert = dict(subject=[[("commonName", "client7")]])
+ self.assertTrue(metadata.AuthenticateConnection(cert, "root", None,
+ "1.2.3.4"))
+ # non-floating cert-auth clients do not add themselves to the cache
+ self.assertNotIn("1.2.3.4", metadata.session_cache)
+
+ cert = dict(subject=[[("commonName", "client8")]])
+
+ mock_resolve_client.return_value = "client5"
+ self.assertTrue(metadata.AuthenticateConnection(None, "root",
+ "password1", "1.2.3.8"))
+
+ mock_resolve_client.side_effect = MetadataConsistencyError
+ self.assertFalse(metadata.AuthenticateConnection(None, "root",
+ "password1",
+ "1.2.3.8"))
+
+ # secure mode, no password
+ self.assertFalse(metadata.AuthenticateConnection(None, 'client2', None,
+ "1.2.3.2"))
+
+ self.assertTrue(metadata.AuthenticateConnection(None, 'uuid1',
+ "password1", "1.2.3.3"))
+ # non-root, non-cert clients populate session cache
+ self.assertIn("1.2.3.3", metadata.session_cache)
+ self.assertEqual(metadata.session_cache["1.2.3.3"][1], "client3")
+
+ # use alternate password
+ self.assertTrue(metadata.AuthenticateConnection(None, 'client3',
+ "password2", "1.2.3.3"))
+
+ # test secure mode
+ self.assertFalse(metadata.AuthenticateConnection(None, 'client9',
+ "password1",
+ "1.2.3.9"))
+ self.assertTrue(metadata.AuthenticateConnection(None, 'client9',
+ "password3", "1.2.3.9"))
+
+ self.assertFalse(metadata.AuthenticateConnection(None, "client5",
+ "password2",
+ "1.2.3.7"))
+
+ @patch("Bcfg2.Server.Plugins.Metadata.XMLMetadataConfig.load_xml", Mock())
+ @patch("Bcfg2.Server.Plugins.Metadata.Metadata.update_client")
+ def test_process_statistics(self, mock_update_client):
+ metadata = self.load_clients_data(metadata=self.load_groups_data())
+ md = Mock()
+ md.hostname = "client6"
+ metadata.process_statistics(md, None)
+ mock_update_client.assert_called_with(md.hostname,
+ dict(auth='cert'))
+
+ mock_update_client.reset_mock()
+ md.hostname = "client5"
+ metadata.process_statistics(md, None)
+ self.assertFalse(mock_update_client.called)
+
+ def test_viz(self):
+ pass