From 0f625983df42d348e259459db9549f209440bd51 Mon Sep 17 00:00:00 2001 From: terminaldweller Date: Sun, 25 Dec 2022 17:51:48 +0330 Subject: update --- bin/virttop | 241 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 241 insertions(+) create mode 100755 bin/virttop (limited to 'bin/virttop') diff --git a/bin/virttop b/bin/virttop new file mode 100755 index 0000000..4310d5d --- /dev/null +++ b/bin/virttop @@ -0,0 +1,241 @@ +#!/usr/bin/env python +"""virt top""" + +# ideally we would like to use the monkeypatch but it is untested +# and experimental +# import defusedxml # type:ignore +# defusedxml.defuse_stdlib() +import argparse +import subprocess +import time +import typing +from defusedxml import ElementTree # type:ignore +from defusedxml import minidom +import libvirt # type:ignore + +hv = ["qemu:///system"] + + +class Argparser: # pylint: disable=too-few-public-methods + """Argparser class.""" + + def __init__(self): + self.parser = argparse.ArgumentParser() + self.parser.add_argument( + "--delay", + "-d", + type=float, + help="the delay between updates", + default=5, + ) + self.args = self.parser.parse_args() + + +# pylint: disable=too-few-public-methods +class Colors: + """static color definitions""" + + purple = "\033[95m" + blue = "\033[94m" + green = "\033[92m" + yellow = "\033[93m" + red = "\033[91m" + grey = "\033[1;37m" + darkgrey = "\033[1;30m" + cyan = "\033[1;36m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + blueblue = "\x1b[38;5;24m" + greenie = "\x1b[38;5;23m" + goo = "\x1b[38;5;22m" + + +def get_network_info(xml) -> typing.Dict[str, str]: + """returns the network info""" + result_dict = {} + interface_types = xml.getElementsByTagName("interface") + for interface_type in interface_types: + interface_nodes = interface_type.childNodes + for interface_node in interface_nodes: + if interface_node.nodeName[0:1] != "#": + for attr in interface_node.attributes.keys(): + result_dict[ + interface_node.attributes[attr].name + ] = interface_node.attributes[attr].value + return result_dict + + +def get_ip_by_arp(mac: str) -> str: + """get ip adress using arp""" + # arp -n | grep xx:xx:xx:xx:xx:xx | gawk '{print $1}' + try: + proc1 = subprocess.run(["arp", "-n"], capture_output=True, check=True) + proc2 = subprocess.run( + ["grep", mac], input=proc1.stdout, capture_output=True, check=True + ) + proc3 = subprocess.run( + ["awk", "{print $1}"], + input=proc2.stdout, + capture_output=True, + check=True, + ) + ip_address = proc3.stdout.decode("utf-8").strip("\n") + except: + ip_address = "N/A" + return ip_address + + +def get_disk_info(xml) -> typing.Dict[str, str]: + """returns the disk info""" + result_dict: typing.Dict = {} + disk_types = xml.getElementsByTagName("disk") + for disk_type in disk_types: + disk_nodes = disk_type.childNodes + for disk_node in disk_nodes: + if disk_node.nodeName[0:1] != "#": + for attr in disk_node.attributes.keys(): + result_dict[ + disk_node.attributes[attr].name + ] = disk_node.attributes[attr].value + + return result_dict + + +def ffs(offset, header_list, numbered, *args): + """A simple columnar printer""" + max_column_width = [] + lines = [] + numbers_f = [] + dummy = [] + + if numbered: + numbers_f.extend(range(1, len(args[-1]) + 1)) + max_column_width.append( + max([len(repr(number)) for number in numbers_f]) + ) + header_list.insert(0, "idx") + + for arg in args: + max_column_width.append(max([len(repr(argette)) for argette in arg])) + + index = range(0, len(header_list)) + for header, width, i in zip(header_list, max_column_width, index): + max_column_width[i] = max(len(header), width) + offset + + for i in index: + dummy.append( + Colors.greenie + + Colors.BOLD + + header_list[i].ljust(max_column_width[i]) + + Colors.ENDC + ) + lines.append("".join(dummy)) + dummy.clear() + + index2 = range(0, len(args[-1])) + for i in index2: + if numbered: + dummy.append( + Colors.goo + + Colors.BOLD + + repr(i).ljust(max_column_width[0]) + + Colors.ENDC + ) + for arg, width in zip(args, max_column_width[1:]): + dummy.append( + Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC + ) + else: + for arg, width in zip(args, max_column_width): + dummy.append( + Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC + ) + lines.append("".join(dummy)) + dummy.clear() + return lines + + +def size_abr(num: float, shift_by: float) -> str: + num = num * shift_by + if num < 1000: + return repr(num) + elif num < 1_000_000.0: + return repr(num / 1_000) + " KB" + elif num < 1_000_000_000: + return repr(num / 1_000_000) + " MB" + elif num < 1_000_000_000_000: + return repr(num / 1_000_000_000) + " GB" + return "N/A" + + +def main() -> None: + """entrypoint""" + # argparser = Argparser() + name = [] + cpu_times = [] + mem_actual = [] + mem_unused = [] + write_bytes = [] + read_bytes = [] + macs = [] + ips = [] + for hv_host in hv: + conn = libvirt.openReadOnly(hv_host) + active_hosts = conn.listDomainsID() + for host_id in active_hosts: + dom = conn.lookupByID(host_id) + cpu_times.append( + repr( + int( + dom.getCPUStats(total=True)[0]["cpu_time"] + / 1_000_000_000 + ) + ) + + " s" + ) + raw_xml = dom.XMLDesc() + xml = minidom.parseString(raw_xml) + name.append(dom.name()) + mem_stats = dom.memoryStats() + mem_actual.append(size_abr(mem_stats["actual"], 1000)) + mem_unused.append(size_abr(mem_stats["unused"], 1000)) + tree = ElementTree.fromstring(dom.XMLDesc()) + iface = tree.find("devices/interface/target").get("dev") + stats = dom.interfaceStats(iface) + write_bytes.append(size_abr(stats[4], 1)) + read_bytes.append(size_abr(stats[0], 1)) + get_disk_info(xml) + network_info = get_network_info(xml) + macs.append(network_info["address"]) + ips.append(get_ip_by_arp(network_info["address"])) + + lines = ffs( + 3, + [ + "name", + "cpu", + "mem_actual", + "mem_unused", + "write_B", + "read_B", + "MAC", + "IP", + ], + True, + name, + cpu_times, + mem_actual, + mem_unused, + write_bytes, + read_bytes, + macs, + ips, + ) + for line in lines: + print(line) + # time.sleep(argparser.args.delay) + + +if __name__ == "__main__": + main() -- cgit v1.2.3