#!/usr/bin/env python """virt top""" # ideally we would like to use the monkeypatch but it is untested # and experimental # import defusedxml # type:ignore # defusedxml.defuse_stdlib() import argparse import dataclasses import subprocess # import time import typing from xml.dom.minidom import Document from defusedxml import ElementTree # type:ignore from defusedxml import minidom import libvirt # type:ignore class Argparser: # pylint: disable=too-few-public-methods """Argparser class.""" def __init__(self): self.parser = argparse.ArgumentParser() self.parser.add_argument( "--delay", "-d", type=float, help="The delay between updates", default=5, ) self.parser.add_argument( "--uri", "-u", nargs="+", type=str, help="A list of URIs to connect to seperated by commas", default=["qemu:///system"], ) self.args = self.parser.parse_args() # pylint: disable=too-few-public-methods class Colors: """static color definitions""" purple = "\033[95m" blue = "\033[94m" green = "\033[92m" yellow = "\033[93m" red = "\033[91m" grey = "\033[1;37m" darkgrey = "\033[1;30m" cyan = "\033[1;36m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" blueblue = "\x1b[38;5;24m" greenie = "\x1b[38;5;23m" goo = "\x1b[38;5;22m" @dataclasses.dataclass # pylint: disable=too-many-instance-attributes class VirtData: """Holds the data that we collect to display to the user""" name: typing.List[str] = dataclasses.field(default_factory=list) cpu_times: typing.List[str] = dataclasses.field(default_factory=list) mem_actual: typing.List[str] = dataclasses.field(default_factory=list) mem_unused: typing.List[str] = dataclasses.field(default_factory=list) write_bytes: typing.List[str] = dataclasses.field(default_factory=list) read_bytes: typing.List[str] = dataclasses.field(default_factory=list) macs: typing.List[str] = dataclasses.field(default_factory=list) ips: typing.List[str] = dataclasses.field(default_factory=list) disk_reads: typing.List[str] = dataclasses.field(default_factory=list) disk_writes: typing.List[str] = dataclasses.field(default_factory=list) def get_network_info( xml_doc: Document, ) -> typing.Dict[str, str]: """returns the network info""" result_dict = {} interface_types = xml_doc.getElementsByTagName("interface") for interface_type in interface_types: interface_nodes = interface_type.childNodes for interface_node in interface_nodes: if interface_node.nodeName[0:1] != "#": for attr in interface_node.attributes.keys(): result_dict[ interface_node.attributes[attr].name ] = interface_node.attributes[attr].value return result_dict def get_ip_by_arp(mac: str) -> str: """Get ip adress using the arp table.""" # arp -n | grep xx:xx:xx:xx:xx:xx | gawk '{print $1}' try: proc1 = subprocess.run( ["/usr/bin/arp", "-n"], capture_output=True, check=True, shell=False, ) proc2 = subprocess.run( ["/usr/bin/grep", mac], input=proc1.stdout, capture_output=True, check=True, shell=False, ) proc3 = subprocess.run( ["/usr/bin/awk", "{print $1}"], input=proc2.stdout, capture_output=True, check=True, shell=False, ) ip_address = proc3.stdout.decode("utf-8").strip("\n") except subprocess.CalledProcessError: ip_address = "N/A" return ip_address def get_disk_info( xml_doc: Document, ) -> typing.Dict[str, str]: """returns the disk info""" result_dict: typing.Dict = {} disk_types = xml_doc.getElementsByTagName("disk") for disk_type in disk_types: disk_nodes = disk_type.childNodes for disk_node in disk_nodes: if disk_node.nodeName[0:1] != "#": for attr in disk_node.attributes.keys(): result_dict[ disk_node.attributes[attr].name ] = disk_node.attributes[attr].value return result_dict def ffs(offset: int, header_list: typing.List[str], numbered: bool, *args): """A simple columnar printer""" max_column_width = [] lines = [] numbers_f: typing.List[int] = [] dummy = [] if numbered: numbers_f.extend(range(1, len(args[-1]) + 1)) max_column_width.append(max(len(repr(number)) for number in numbers_f)) header_list.insert(0, "idx") for arg in args: max_column_width.append(max(len(repr(argette)) for argette in arg)) index = range(0, len(header_list)) for header, width, i in zip(header_list, max_column_width, index): max_column_width[i] = max(len(header), width) + offset for i in index: dummy.append( Colors.greenie + Colors.BOLD + header_list[i].ljust(max_column_width[i]) + Colors.ENDC ) lines.append("".join(dummy)) dummy.clear() index2 = range(0, len(args[-1])) for i in index2: if numbered: dummy.append( Colors.goo + Colors.BOLD + repr(i).ljust(max_column_width[0]) + Colors.ENDC ) for arg, width in zip(args, max_column_width[1:]): dummy.append( Colors.blueblue + (arg[i]).ljust(width) + Colors.ENDC ) else: for arg, width in zip(args, max_column_width): dummy.append( Colors.blueblue + (arg[i]).ljust(width) + Colors.ENDC ) lines.append("".join(dummy)) dummy.clear() return lines def size_abr(num: float, shift_by: float) -> str: """Rounds and abbreviates floats.""" num = num * shift_by if num < 1000: return repr(num) if num < 1_000_000.0: return repr(round(num / 1_000, 2)) + " KB" if num < 1_000_000_000: return repr(round(num / 1_000_000, 2)) + " MB" if num < 1_000_000_000_000: return repr(round(num / 1_000_000_000, 2)) + " GB" return "N/A" def fill_virt_data_uri(conn, active_hosts, virt_data: VirtData) -> None: """fill VirtData for one URI.""" for host_id in active_hosts: dom = conn.lookupByID(host_id) virt_data.cpu_times.append( repr( int(dom.getCPUStats(total=True)[0]["cpu_time"] / 1_000_000_000) ) + "s" ) xml_doc = minidom.parseString(dom.XMLDesc()) virt_data.name.append(dom.name()) mem_stats = dom.memoryStats() virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000)) virt_data.mem_unused.append(size_abr(mem_stats["unused"], 1000)) tree = ElementTree.fromstring(dom.XMLDesc()) iface = tree.find("devices/interface/target").get("dev") stats = dom.interfaceStats(iface) virt_data.write_bytes.append(size_abr(stats[4], 1)) virt_data.read_bytes.append(size_abr(stats[0], 1)) disk_info = get_disk_info(xml_doc) image_name = disk_info["file"] _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name) virt_data.disk_reads.append(size_abr(rd_bytes, 1)) virt_data.disk_writes.append(size_abr(wr_bytes, 1)) network_info = get_network_info(xml_doc) virt_data.macs.append(network_info["address"]) virt_data.ips.append(get_ip_by_arp(network_info["address"])) def main() -> None: """entrypoint""" argparser = Argparser() virt_data = VirtData() for hv_host in argparser.args.uri: conn = libvirt.openReadOnly(hv_host) active_hosts = conn.listDomainsID() fill_virt_data_uri(conn, active_hosts, virt_data) lines = ffs( 3, [ "name", "cpu", "mem_actual", "mem_unused", "net_write_B", "net_read_B", "MAC", "IP", "IO_read_B", "IO_write_B", ], True, virt_data.name, virt_data.cpu_times, virt_data.mem_actual, virt_data.mem_unused, virt_data.write_bytes, virt_data.read_bytes, virt_data.macs, virt_data.ips, virt_data.disk_reads, virt_data.disk_writes, ) for line in lines: print(line) # time.sleep(argparser.args.delay) if __name__ == "__main__": main()