diff options
Diffstat (limited to 'testsuite')
-rw-r--r-- | testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py | 178 |
1 files changed, 118 insertions, 60 deletions
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py index 2163aa037..1022bdc5a 100644 --- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py +++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py @@ -1,5 +1,6 @@ import os import sys +import copy import time import lxml.etree import Bcfg2.Server @@ -25,6 +26,47 @@ test_data = dict(a=1, b=[1, 2, 3], c="test", d=dict(a=1, b=dict(a=1), c=(1, "2", 3))) +class FakeElement(lxml.etree._Element): + getroottree = Mock() + + def __init__(self, el): + self._element = el + + def __getattribute__(self, attr): + el = lxml.etree._Element.__getattribute__(self, + '__dict__')['_element'] + if attr == "getroottree": + return FakeElement.getroottree + elif attr == "_element": + return el + else: + return getattr(el, attr) + + +class StoringElement(object): + OriginalElement = copy.copy(lxml.etree.Element) + + def __init__(self): + self.element = None + self.return_value = None + + def __call__(self, *args, **kwargs): + self.element = self.OriginalElement(*args, **kwargs) + self.return_value = FakeElement(self.element) + return self.return_value + + +class StoringSubElement(object): + OriginalSubElement = copy.copy(lxml.etree.SubElement) + + def __call__(self, parent, tag, **kwargs): + try: + return self.OriginalSubElement(parent._element, tag, + **kwargs) + except AttributeError: + return self.OriginalSubElement(parent, tag, **kwargs) + + class FakeList(list): pass @@ -288,61 +330,71 @@ text probes._write_data_db.assert_called_with("test") self.assertFalse(probes._write_data_xml.called) - @patch("%s.open" % builtins) - def test__write_data_xml(self, mock_open): + def test__write_data_xml(self): probes = self.get_probes_object(use_db=False) probes.probedata = self.get_test_probedata() probes.cgroups = self.get_test_cgroups() - probes._write_data_xml(None) - - mock_open.assert_called_with(os.path.join(datastore, probes.name, - "probed.xml"), "w") - data = lxml.etree.XML(mock_open.return_value.write.call_args[0][0]) - self.assertEqual(len(data.xpath("//Client")), 2) - - foodata = data.find("Client[@name='foo.example.com']") - self.assertIsNotNone(foodata) - self.assertIsNotNone(foodata.get("timestamp")) - self.assertEqual(len(foodata.findall("Probe")), - len(probes.probedata['foo.example.com'])) - self.assertEqual(len(foodata.findall("Group")), - len(probes.cgroups['foo.example.com'])) - xml = foodata.find("Probe[@name='xml']") - self.assertIsNotNone(xml) - self.assertIsNotNone(xml.get("value")) - xdata = lxml.etree.XML(xml.get("value")) - self.assertIsNotNone(xdata) - self.assertIsNotNone(xdata.find("test")) - self.assertEqual(xdata.find("test").get("foo"), "foo") - text = foodata.find("Probe[@name='text']") - self.assertIsNotNone(text) - self.assertIsNotNone(text.get("value")) - multiline = foodata.find("Probe[@name='multiline']") - self.assertIsNotNone(multiline) - self.assertIsNotNone(multiline.get("value")) - self.assertGreater(len(multiline.get("value").splitlines()), 1) - - bardata = data.find("Client[@name='bar.example.com']") - self.assertIsNotNone(bardata) - self.assertIsNotNone(bardata.get("timestamp")) - self.assertEqual(len(bardata.findall("Probe")), - len(probes.probedata['bar.example.com'])) - self.assertEqual(len(bardata.findall("Group")), - len(probes.cgroups['bar.example.com'])) - empty = bardata.find("Probe[@name='empty']") - self.assertIsNotNone(empty) - self.assertIsNotNone(empty.get("value")) - self.assertEqual(empty.get("value"), "") - if HAS_JSON: - jdata = bardata.find("Probe[@name='json']") - self.assertIsNotNone(jdata) - self.assertIsNotNone(jdata.get("value")) - self.assertItemsEqual(test_data, json.loads(jdata.get("value"))) - if HAS_YAML: - ydata = bardata.find("Probe[@name='yaml']") - self.assertIsNotNone(ydata) - self.assertIsNotNone(ydata.get("value")) - self.assertItemsEqual(test_data, yaml.load(ydata.get("value"))) + + @patch("lxml.etree.Element") + @patch("lxml.etree.SubElement", StoringSubElement()) + def inner(mock_Element): + mock_Element.side_effect = StoringElement() + probes._write_data_xml(None) + + top = mock_Element.side_effect.return_value + write = top.getroottree.return_value.write + self.assertEqual(write.call_args[0][0], + os.path.join(datastore, probes.name, + "probed.xml")) + + data = top._element + foodata = data.find("Client[@name='foo.example.com']") + self.assertIsNotNone(foodata) + self.assertIsNotNone(foodata.get("timestamp")) + self.assertEqual(len(foodata.findall("Probe")), + len(probes.probedata['foo.example.com'])) + self.assertEqual(len(foodata.findall("Group")), + len(probes.cgroups['foo.example.com'])) + xml = foodata.find("Probe[@name='xml']") + self.assertIsNotNone(xml) + self.assertIsNotNone(xml.get("value")) + xdata = lxml.etree.XML(xml.get("value")) + self.assertIsNotNone(xdata) + self.assertIsNotNone(xdata.find("test")) + self.assertEqual(xdata.find("test").get("foo"), "foo") + text = foodata.find("Probe[@name='text']") + self.assertIsNotNone(text) + self.assertIsNotNone(text.get("value")) + multiline = foodata.find("Probe[@name='multiline']") + self.assertIsNotNone(multiline) + self.assertIsNotNone(multiline.get("value")) + self.assertGreater(len(multiline.get("value").splitlines()), 1) + + bardata = data.find("Client[@name='bar.example.com']") + self.assertIsNotNone(bardata) + self.assertIsNotNone(bardata.get("timestamp")) + self.assertEqual(len(bardata.findall("Probe")), + len(probes.probedata['bar.example.com'])) + self.assertEqual(len(bardata.findall("Group")), + len(probes.cgroups['bar.example.com'])) + empty = bardata.find("Probe[@name='empty']") + self.assertIsNotNone(empty) + self.assertIsNotNone(empty.get("value")) + self.assertEqual(empty.get("value"), "") + if HAS_JSON: + jdata = bardata.find("Probe[@name='json']") + self.assertIsNotNone(jdata) + self.assertIsNotNone(jdata.get("value")) + self.assertItemsEqual(test_data, + json.loads(jdata.get("value"))) + if HAS_YAML: + ydata = bardata.find("Probe[@name='yaml']") + self.assertIsNotNone(ydata) + self.assertIsNotNone(ydata.get("value")) + self.assertItemsEqual(test_data, + yaml.load(ydata.get("value"))) + + inner() @skipUnless(HAS_DJANGO, "Django not found, skipping") def test__write_data_db(self): @@ -414,18 +466,24 @@ text probes._load_data_db.assert_any_call() self.assertFalse(probes._load_data_xml.called) - @patch("%s.open" % builtins) @patch("lxml.etree.parse") - def test__load_data_xml(self, mock_parse, mock_open): + def test__load_data_xml(self, mock_parse): probes = self.get_probes_object(use_db=False) - # to get the value for lxml.etree.parse to parse, we call - # _write_data_xml, mock the open() call, and grab the data - # that gets "written" to probed.xml probes.probedata = self.get_test_probedata() probes.cgroups = self.get_test_cgroups() - probes._write_data_xml(None) - xdata = \ - lxml.etree.XML(str(mock_open.return_value.write.call_args[0][0])) + + # to get the value for lxml.etree.parse to parse, we call + # _write_data_xml, mock the lxml.etree._ElementTree.write() + # call, and grab the data that gets "written" to probed.xml + @patch("lxml.etree.Element") + @patch("lxml.etree.SubElement", StoringSubElement()) + def inner(mock_Element): + mock_Element.side_effect = StoringElement() + probes._write_data_xml(None) + top = mock_Element.side_effect.return_value + return top._element + + xdata = inner() mock_parse.return_value = xdata.getroottree() probes.probedata = dict() probes.cgroups = dict() |