diff options
author | Jonah BrĂ¼chert <jbb@kaidan.im> | 2024-04-16 18:43:06 +0200 |
---|---|---|
committer | Jonah BrĂ¼chert <jbb@kaidan.im> | 2024-04-16 18:43:06 +0200 |
commit | 3fcbc1586ef863ef7f503fd35df3f59c3df4b9f6 (patch) | |
tree | 9ea3b49f171f79df8a72d0373c25141784c8c6b2 | |
parent | b695b5b3b12448b501686a108440787fa2395ac7 (diff) | |
parent | 9f6e876e14a074c386660c582c7f78e1503877aa (diff) | |
download | tools-debian.tar.gz tools-debian.tar.bz2 tools-debian.zip |
Merge branch 'master' into debiandebian
-rw-r--r-- | .gitignore | 1 | ||||
-rwxr-xr-x | bin/hostinfo | 55 | ||||
-rw-r--r-- | hostinfo/prefix.py | 11 | ||||
-rw-r--r-- | hostinfo/printer.py | 89 | ||||
-rwxr-xr-x | setup.py | 6 | ||||
-rw-r--r-- | version.py | 54 |
6 files changed, 83 insertions, 133 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..751553b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.bak diff --git a/bin/hostinfo b/bin/hostinfo index 8700be7..639102c 100755 --- a/bin/hostinfo +++ b/bin/hostinfo @@ -8,6 +8,8 @@ import os import pkg_resources from dns import resolver, reversename +from typing import Optional + OWN_DIRECTORY = os.path.dirname(os.path.abspath(os.path.realpath(__file__))) LIB = os.path.join(OWN_DIRECTORY, '..') if os.path.exists(os.path.join(LIB, 'hostinfo')): @@ -16,9 +18,11 @@ if os.path.exists(os.path.join(LIB, 'hostinfo')): from hostinfo import printer from hostinfo import utils + def _get_data(path): - stream = file(path, 'r') - return yaml.load(stream) + stream = open(path, 'r') + return yaml.safe_load(stream) + def _match_key(data, keys): if data is None: @@ -53,6 +57,7 @@ def _match_key(data, keys): return None + def _match(host, search_key, search_value, negate): search_keys = search_key.split('.') @@ -71,6 +76,7 @@ def _match(host, search_key, search_value, negate): return (search_key, result) return (search_key, None) + def _parse_search(search): if search[0] != '?': sys.stderr.write("Invalid search string.") @@ -90,10 +96,13 @@ def _parse_search(search): return (search, value, negate) -def print_search(basepath, flags, search, filter_key=None): + +def print_search(basepath: str, flags: argparse.Namespace, + search: str, filter_key: Optional[str] = None): def _get_label(host): if flags.short: - return host.replace('.spline.inf.fu-berlin.de','') + return host.replace('.spline.inf.fu-berlin.de', + '') return host metadata = os.path.join(basepath, 'metadata', 'hosts') @@ -110,7 +119,7 @@ def print_search(basepath, flags, search, filter_key=None): key, result = _match(data, search_key, search_value, negate) if result is not None: if flags.only_names: - print(_get_label(host)) + print((_get_label(host))) continue p = printer.Printer(data, flags) @@ -119,30 +128,32 @@ def print_search(basepath, flags, search, filter_key=None): maxlength=max(length), force=True) else: if key is None: - print(_get_label(host)) + print((_get_label(host))) else: p.info(key, label=_get_label(host), maxlength=max(length), force=True) + def print_info(path, flags, key=None): data = _get_data(path) p = printer.Printer(data, flags) p.info(key) + def print_keys(path): def _print_keys(data, prefix = ''): if isinstance(data, str): return - for key in data.keys(): - print "%s%s" % (prefix, key) + for key in list(data.keys()): + print("%s%s" % (prefix, key)) if key == 'addresses': - for k in utils.group_by(data[key], 'interface').keys(): - print "%s%s.%s" % (prefix, key, k) + for k in list(utils.group_by(data[key], 'interface').keys()): + print("%s%s.%s" % (prefix, key, k)) elif key == 'ports': - for k in utils.group_by(data[key], 'process', 'UNKNOWN').keys(): - print "%s%s.%s" % (prefix, key, k) + for k in list(utils.group_by(data[key], 'process', 'UNKNOWN').keys()): + print("%s%s.%s" % (prefix, key, k)) elif isinstance(data[key], dict): _print_keys(data[key], "%s%s." % (prefix, key)) elif isinstance(data[key], list): @@ -152,14 +163,15 @@ def print_keys(path): data = _get_data(path) _print_keys(data) -def print_hosts(path, short): + +def print_hosts(path: str, short: bool): metadata = os.path.join(path, 'metadata', 'hosts') if os.path.exists(metadata): - hosts = yaml.load(file(metadata, 'r')) + hosts = yaml.safe_load(open(metadata, 'r')) if 'hosts' in hosts: for host in hosts['hosts']: if short: - print(host.replace('.spline.inf.fu-berlin.de','')) + print((host.replace('.spline.inf.fu-berlin.de',''))) else: print(host) return True @@ -167,7 +179,8 @@ def print_hosts(path, short): sys.stderr.write("'%s' not found!\n" % metadata) return False -def find_host(basepath, host): + +def find_host(basepath: str, host: str): path = os.path.join(basepath, host) if os.path.exists(path): return path @@ -191,6 +204,7 @@ def find_host(basepath, host): return None + def print_version_and_exit(): ver = None try: @@ -202,10 +216,11 @@ def print_version_and_exit(): if ver is None: sys.stderr.write('Unable to identify the version information.') sys.exit(1) - print("hostinfo-tools %s" % ver) + print(("hostinfo-tools %s" % ver)) sys.exit(0) -def main(): + +def main() -> None: basepath = '/usr/local/share/hostinfo' if 'HOSTINFO_PATH' in os.environ and os.environ['HOSTINFO_PATH'] != '': basepath = os.environ['HOSTINFO_PATH'] @@ -256,7 +271,8 @@ def main(): if args.name.startswith('?'): # search - print_search(basepath, search=args.name, filter_key=args.filter, flags=args) + print_search(basepath, search=args.name, filter_key=args.filter, + flags=args) else: # info path = find_host(basepath, args.name) @@ -273,5 +289,6 @@ def main(): sys.exit(0) + if __name__ == '__main__': main() diff --git a/hostinfo/prefix.py b/hostinfo/prefix.py index e1b72b5..7233c48 100644 --- a/hostinfo/prefix.py +++ b/hostinfo/prefix.py @@ -1,9 +1,12 @@ # -*- coding: utf-8 -*- -from __future__ import print_function +class Flags: + oneline: bool = False + nospaces: bool = False + class Printer: - flags = list() + flags = Flags() def __init__(self, full_key='', printer=None): if printer is None: @@ -20,7 +23,7 @@ class Printer: def set_label(self, label='', maxlength=0): self.label = self._get_label(label, maxlength) - def _get_label(self, label, maxlength): + def _get_label(self, label: str, maxlength: int): if label == '': return label @@ -33,7 +36,7 @@ class Printer: else: return label.ljust(maxlength+2) - def pprint(self, data): + def pprint(self, data: str): self.output("%s%s" % (self.label, data)) self.has_output = True if not self.empty: diff --git a/hostinfo/printer.py b/hostinfo/printer.py index ea612a5..4054df2 100644 --- a/hostinfo/printer.py +++ b/hostinfo/printer.py @@ -1,27 +1,33 @@ # -*- coding: utf-8 -*- -from __future__ import absolute_import + from hostinfo import prefix from hostinfo import utils +from typing import Iterable, Optional, Any, Union + + def _get_full_key(prev_key, key): if prev_key == '': return key return "%s.%s" % (prev_key, key) -def _sort_with_list(iterable, sort): - def helper(value): + +def _sort_with_list(iterable: Iterable, sort: list[str]): + def helper(value) -> str: if sort is None: return value (key, _) = value if key in sort: - return sort.index(key) + return str(sort.index(key)) return "%d %s" % (len(sort), key) return sorted(iterable, key=helper) -def _space(filter_key, full_key, printer, force=False): + +def _space(filter_key: Optional[str], full_key: str, + printer: prefix.Printer, force=False): if filter_key is None and full_key == '': printer.space(force) @@ -46,49 +52,21 @@ class Printer: self.flags = flags prefix.Printer.flags = flags - def cb_print_addresses(self, value, full_key, filter_key): - def _print_ip(address): - return '%s/%s' % (address['address'], address['netmask']) - - display_check = self._is_group_displayed(full_key, filter_key) - return utils.group_by(value, 'interface', None, display_check, _print_ip) - - def cb_print_ports(self, value, full_key, filter_key): - def _print_port(port): - if port['proto'] in ['tcp6', 'udp6']: - return '(%s) [%s]:%s' % (port['proto'].replace('6', ''), - port['ip'], port['port']) - return '(%s) %s:%s' % (port['proto'], port['ip'], port['port']) - - display_check = self._is_group_displayed(full_key, filter_key) - return (utils.group_by(value, 'process', 'UNKNOWN', display_check, _print_port), - ['sshd', 'nrpe', 'munin-node']) - - def cb_print_vserver(self, value, full_key, filter_key): - if value == 'guest' and 'vserver_host' in self.data and \ - self.data['vserver_host'] is not None: - return 'guest running on %s' % self.data['vserver_host'] - else: - return value - - def _should_display(self, full_key, filter_key=None): + def _should_display(self, full_key, + filter_key: Optional[str] = None): if full_key not in self.ignore: return True if filter_key is not None and filter_key.startswith(full_key): return True return False - def _is_group_displayed(self, prev_key, filter_key): - return (lambda key: self._should_display(_get_full_key(prev_key, key), - filter_key)) - - def _print(self, value, printer, filter_key=None, sort=None, force=False): - try: - value = value.strip().splitlines() - except AttributeError: - pass - - if isinstance(value, dict): + def _print(self, value: Union[str, dict[str, Any], list[str]], + printer: prefix.Printer, + filter_key: Optional[str] = None, + sort=None, force=False): + if isinstance(value, str): + self._print_list(value.strip().splitlines(), printer, filter_key) + elif isinstance(value, dict): self._print_dict(value, printer, filter_key, sort, force) elif isinstance(value, list): self._print_list(value, printer, filter_key) @@ -98,7 +76,8 @@ class Printer: else: self._print_value(value, printer, filter_key) - def _print_key(self, key, value, printer, filter_key): + def _print_key(self, key: str, value: str, + printer: prefix.Printer, filter_key): sort = None try: method = getattr(self, 'cb_print_%s' % key.replace('.', '_')) @@ -113,29 +92,33 @@ class Printer: self._print(value, printer, filter_key, sort) - def _print_value(self, value, printer, filter_key): + def _print_value(self, value: str, printer: prefix.Printer, + filter_key: Optional[str]): full_key = _get_full_key(printer.full_key, value) if self._should_display(full_key, filter_key) and \ filter_key is None or full_key == filter_key: printer.pprint(value) - def _print_list(self, values, printer, filter_key): + def _print_list(self, values: list[str], + printer: prefix.Printer, + filter_key: Optional[str]): for value in values: - if isinstance(value, basestring): + if isinstance(value, str): self._print_value(value, printer, filter_key) else: self._print(value, printer, filter_key) - def _print_dict(self, value, printer, filter_key, sort, force): + def _print_dict(self, value: dict[str, str], printer: prefix.Printer, + filter_key: Optional[str], sort: list, force: bool): keys = _sort_with_list( - [(key, full_key) for key in value.keys() + [(key, full_key) for key in list(value.keys()) for full_key in [_get_full_key(printer.full_key, key)] if self._should_display(full_key, filter_key)], sort) if len(keys) == 0: return - maxlength = max([len(self._get_label(key, full_key)) \ + maxlength = max([len(self._get_label(key, full_key)) for (key, full_key) in keys]) _space(filter_key, printer.full_key, printer, True) @@ -152,7 +135,8 @@ class Printer: found = True new_printer = prefix.Printer(full_key, printer) if filter_key is None: - new_printer.set_label(self._get_label(key, full_key), maxlength) + new_printer.set_label(self._get_label(key, full_key), + maxlength) self._print_key(full_key, value[key], new_printer, new_filter_key) _space(filter_key, printer.full_key, new_printer) @@ -160,12 +144,13 @@ class Printer: if force and not found: printer.pprint('') - def _get_label(self, key, full_key): + def _get_label(self, key: str, full_key: str): if full_key in self.labels: return self.labels[full_key] return key - def info(self, key, label=None, maxlength=0, force=False): + def info(self, key: Optional[str], label: Optional[str] = None, + maxlength=0, force=False): printer = prefix.Printer() if label is not None: printer.set_label(label, maxlength) @@ -1,14 +1,12 @@ #!/usr/bin/env python from distutils.core import setup -from version import * setup(name='hostinfo-tools', - version=get_git_version(), + version="0.2.3", description='Hostinfo database interface scripts', author='Alexander Sulfrian', author_email='alex@spline.inf.fu-berlin.de', url='http://git.spline.inf.fu-berlin.de/hostinfo-tools/', packages=['hostinfo'], - scripts=['bin/hostinfo'], - ) + scripts=['bin/hostinfo']) diff --git a/version.py b/version.py deleted file mode 100644 index 08bc587..0000000 --- a/version.py +++ /dev/null @@ -1,54 +0,0 @@ -# -*- coding: utf-8 -*- -# To use this script, simply import it your setup.py file, and use the -# results of get_git_version() as your package version: -# -# from version import * -# -# setup( -# version=get_git_version(), -# . -# . -# . -# ) - -__all__ = ["get_git_version"] - -import os -import re -from subprocess import Popen, PIPE - -OWN_DIR = os.path.dirname(os.path.abspath(os.path.realpath(__file__))) - -def call_git_describe(abbrev=4): - try: - p = Popen(['git', 'describe', '--abbrev=%d' % abbrev, '--tags'], - cwd=OWN_DIR, stdout=PIPE, stderr=PIPE) - p.stderr.close() - line = p.stdout.readlines()[0] - return line.strip() - - except: - return None - -def parse_debian_changelog(): - try: - from debian import changelog - - with open(os.path.join(OWN_DIR, 'debian', 'changelog')) as cfile: - clog = changelog.Changelog(cfile) - return str(clog.get_version()) - except: - return None - -def get_git_version(abbrev=4): - version = call_git_describe(abbrev) - if version is None: - version = parse_debian_changelog() - - if version is None: - raise ValueError("Cannot find the version number!") - - return re.sub('^debian/', '', version) - -if __name__ == "__main__": - print get_git_version() |