1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
"""Debconf Support for Bcfg2"""
import subprocess
import Bcfg2.Options
import Bcfg2.Client.Tools
class Debconf(Bcfg2.Client.Tools.Tool):
"""Debconf Support for Bcfg2."""
name = 'Debconf'
__execs__ = ['/usr/bin/debconf-communicate', '/usr/bin/debconf-show']
__handles__ = [('Conf', 'debconf')]
__req__ = {'Conf': ['name']}
def __init__(self, config):
Bcfg2.Client.Tools.Tool.__init__(self, config)
#: This is the referrence to the Popen object of the
#: running debconf-communicate process. If this is None,
#: no process is runnning.
self.debconf = None
def _start_debconf(self):
if self.debconf is None:
self.debconf = subprocess.Popen(
['/usr/bin/debconf-communicate'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
def _stop_debconf(self):
if self.debconf is not None:
self.debconf.stdin.close()
self.debconf.stdout.close()
self.debconf = None
def _debconf_reply(self, msg):
if self.debconf is None:
self._start_debconf()
self.logger.debug('Debconf: %s' % msg.strip())
self.debconf.stdin.write(msg)
line = self.debconf.stdout.readline().rstrip('\n')
self.logger.debug('< %s' % line)
reply = line.split(' ', 1)
result = None
if len(reply) > 1:
result = reply[1]
return (reply[0] == '0', result)
def debconf_get(self, key):
(success, value) = self._debconf_reply('GET %s\n' % key)
if not success:
return (False, None)
(_, seen) = self._debconf_reply('FGET %s seen\n' % key)
return (seen == 'true', value)
def debconf_set(self, key, value):
(success, _) = self._debconf_reply('SET %s %s\n' % (key, value))
if success:
self._debconf_reply('FSET %s seen true\n' % key)
return success
def debconf_reset(self, key):
(success, _) = self._debconf_reply('RESET %s\n' % key)
return success
def VerifyConf(self, entry, _modlist):
""" Verify the given Debconf entry. """
if entry.get('ignore', 'false').lower() == 'true':
return True
(seen, current_value) = self.debconf_get(entry.get('name'))
if not seen:
current_value = '%s (not seen)' % current_value
entry.set('current_value', current_value)
return seen and current_value == entry.get('value')
def InstallConf(self, entry):
""" Install the given Debconf entry. """
return self.debconf_set(entry.get('name'), entry.get('value'))
def Inventory(self, structures=None):
try:
result = Bcfg2.Client.Tools.Tool.Inventory(self, structures)
finally:
self._stop_debconf()
return result
Inventory.__doc__ = Bcfg2.Client.Tools.Tool.Inventory.__doc__
def Install(self, entries):
try:
result = Bcfg2.Client.Tools.Tool.Install(self, entries)
finally:
self._stop_debconf()
return result
Install.__doc__ = Bcfg2.Client.Tools.Tool.Install.__doc__
def Remove(self, entries):
try:
for entry in entries:
self.debconf_reset(entry.get('name'))
self.modified += entry
finally:
self._stop_debconf()
self.extra = self.FindExtra()
Remove.__doc__ = Bcfg2.Client.Tools.Tool.Remove.__doc__
def FindExtra(self):
specified = [entry.get('name')
for entry in self.getSupportedEntries()]
extra = dict()
listowners = self.cmd.run(['/usr/bin/debconf-show', '--listowners'])
if listowners.success:
owners = listowners.stdout.splitlines()
values = self.cmd.run(['/usr/bin/debconf-show'] + owners)
for line in values.stdout.splitlines():
if len(line) > 2 and line[0] == '*':
(name, current_value) = line[2:].split(':', 1)
if name not in specified and name not in extra:
extra[name] = Bcfg2.Client.XML.Element(
'Conf', name=name, type='debconf',
current_value=current_value[1:])
return list(extra.values())
FindExtra.__doc__ = Bcfg2.Client.Tools.Tool.FindExtra.__doc__
|