#!/usr/bin/env python """virt top""" # ideally we would like to use the monkeypatch but it is untested # and experimental # import defusedxml # type:ignore # defusedxml.defuse_stdlib() import argparse import subprocess import time import typing from defusedxml import ElementTree # type:ignore from defusedxml import minidom import libvirt # type:ignore hv = ["qemu:///system"] class Argparser: # pylint: disable=too-few-public-methods """Argparser class.""" def __init__(self): self.parser = argparse.ArgumentParser() self.parser.add_argument( "--delay", "-d", type=float, help="the delay between updates", default=5, ) self.args = self.parser.parse_args() # pylint: disable=too-few-public-methods class Colors: """static color definitions""" purple = "\033[95m" blue = "\033[94m" green = "\033[92m" yellow = "\033[93m" red = "\033[91m" grey = "\033[1;37m" darkgrey = "\033[1;30m" cyan = "\033[1;36m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" blueblue = "\x1b[38;5;24m" greenie = "\x1b[38;5;23m" goo = "\x1b[38;5;22m" def get_network_info(xml) -> typing.Dict[str, str]: """returns the network info""" result_dict = {} interface_types = xml.getElementsByTagName("interface") for interface_type in interface_types: interface_nodes = interface_type.childNodes for interface_node in interface_nodes: if interface_node.nodeName[0:1] != "#": for attr in interface_node.attributes.keys(): result_dict[ interface_node.attributes[attr].name ] = interface_node.attributes[attr].value return result_dict def get_ip_by_arp(mac: str) -> str: """get ip adress using arp""" # arp -n | grep xx:xx:xx:xx:xx:xx | gawk '{print $1}' try: proc1 = subprocess.run(["arp", "-n"], capture_output=True, check=True) proc2 = subprocess.run( ["grep", mac], input=proc1.stdout, capture_output=True, check=True ) proc3 = subprocess.run( ["awk", "{print $1}"], input=proc2.stdout, capture_output=True, check=True, ) ip_address = proc3.stdout.decode("utf-8").strip("\n") except: ip_address = "N/A" return ip_address def get_disk_info(xml) -> typing.Dict[str, str]: """returns the disk info""" result_dict: typing.Dict = {} disk_types = xml.getElementsByTagName("disk") for disk_type in disk_types: disk_nodes = disk_type.childNodes for disk_node in disk_nodes: if disk_node.nodeName[0:1] != "#": for attr in disk_node.attributes.keys(): result_dict[ disk_node.attributes[attr].name ] = disk_node.attributes[attr].value return result_dict def ffs(offset, header_list, numbered, *args): """A simple columnar printer""" max_column_width = [] lines = [] numbers_f = [] dummy = [] if numbered: numbers_f.extend(range(1, len(args[-1]) + 1)) max_column_width.append( max([len(repr(number)) for number in numbers_f]) ) header_list.insert(0, "idx") for arg in args: max_column_width.append(max([len(repr(argette)) for argette in arg])) index = range(0, len(header_list)) for header, width, i in zip(header_list, max_column_width, index): max_column_width[i] = max(len(header), width) + offset for i in index: dummy.append( Colors.greenie + Colors.BOLD + header_list[i].ljust(max_column_width[i]) + Colors.ENDC ) lines.append("".join(dummy)) dummy.clear() index2 = range(0, len(args[-1])) for i in index2: if numbered: dummy.append( Colors.goo + Colors.BOLD + repr(i).ljust(max_column_width[0]) + Colors.ENDC ) for arg, width in zip(args, max_column_width[1:]): dummy.append( Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC ) else: for arg, width in zip(args, max_column_width): dummy.append( Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC ) lines.append("".join(dummy)) dummy.clear() return lines def size_abr(num: float, shift_by: float) -> str: num = num * shift_by if num < 1000: return repr(num) elif num < 1_000_000.0: return repr(num / 1_000) + " KB" elif num < 1_000_000_000: return repr(num / 1_000_000) + " MB" elif num < 1_000_000_000_000: return repr(num / 1_000_000_000) + " GB" return "N/A" def main() -> None: """entrypoint""" # argparser = Argparser() name = [] cpu_times = [] mem_actual = [] mem_unused = [] write_bytes = [] read_bytes = [] macs = [] ips = [] for hv_host in hv: conn = libvirt.openReadOnly(hv_host) active_hosts = conn.listDomainsID() for host_id in active_hosts: dom = conn.lookupByID(host_id) cpu_times.append( repr( int( dom.getCPUStats(total=True)[0]["cpu_time"] / 1_000_000_000 ) ) + " s" ) raw_xml = dom.XMLDesc() xml = minidom.parseString(raw_xml) name.append(dom.name()) mem_stats = dom.memoryStats() mem_actual.append(size_abr(mem_stats["actual"], 1000)) mem_unused.append(size_abr(mem_stats["unused"], 1000)) tree = ElementTree.fromstring(dom.XMLDesc()) iface = tree.find("devices/interface/target").get("dev") stats = dom.interfaceStats(iface) write_bytes.append(size_abr(stats[4], 1)) read_bytes.append(size_abr(stats[0], 1)) get_disk_info(xml) network_info = get_network_info(xml) macs.append(network_info["address"]) ips.append(get_ip_by_arp(network_info["address"])) lines = ffs( 3, [ "name", "cpu", "mem_actual", "mem_unused", "write_B", "read_B", "MAC", "IP", ], True, name, cpu_times, mem_actual, mem_unused, write_bytes, read_bytes, macs, ips, ) for line in lines: print(line) # time.sleep(argparser.args.delay) if __name__ == "__main__": main()