#!/usr/bin/env python """virt top""" # ideally we would like to use the monkeypatch but it is untested # and experimental # import defusedxml # type:ignore # defusedxml.defuse_stdlib() import argparse import csv import dataclasses import enum import os import signal import sys import time import typing # we are only using this for type annotation from xml.dom.minidom import Document # nosec from defusedxml import ElementTree # type:ignore from defusedxml import minidom import libvirt # type:ignore # pylint: disable=unused-argument def sig_handler_sigint(signum, frame): """Just to handle C-c gracefully""" print() sys.exit(0) class Argparser: # pylint: disable=too-few-public-methods """Argparser class.""" def __init__(self): self.parser = argparse.ArgumentParser() self.parser.add_argument( "--delay", "-d", type=float, help="The delay between updates", default=5, ) self.parser.add_argument( "--uri", "-u", nargs="+", type=str, help="A list of URIs to connect to seperated by commas", default=["qemu:///system"], ) self.args = self.parser.parse_args() # pylint: disable=too-few-public-methods class Colors(enum.EnumType): """static color definitions""" purple = "\033[95m" blue = "\033[94m" green = "\033[92m" yellow = "\033[93m" red = "\033[91m" grey = "\033[1;37m" darkgrey = "\033[1;30m" cyan = "\033[1;36m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" blueblue = "\x1b[38;5;24m" greenie = "\x1b[38;5;23m" goo = "\x1b[38;5;22m" screen_clear = "\033c\033[3J" hide_cursor = "\033[?25l" @dataclasses.dataclass # pylint: disable=too-many-instance-attributes class VirtData: """Holds the data that we collect to display to the user""" vm_id: typing.List[str] = dataclasses.field(default_factory=list) name: typing.List[str] = dataclasses.field(default_factory=list) cpu_times: typing.List[str] = dataclasses.field(default_factory=list) mem_actual: typing.List[str] = dataclasses.field(default_factory=list) mem_unused: typing.List[str] = dataclasses.field(default_factory=list) write_bytes: typing.List[str] = dataclasses.field(default_factory=list) read_bytes: typing.List[str] = dataclasses.field(default_factory=list) macs: typing.List[str] = dataclasses.field(default_factory=list) ips: typing.List[str] = dataclasses.field(default_factory=list) disk_reads: typing.List[str] = dataclasses.field(default_factory=list) disk_writes: typing.List[str] = dataclasses.field(default_factory=list) snapshot_counts: typing.List[str] = dataclasses.field(default_factory=list) uri: typing.List[str] = dataclasses.field(default_factory=list) memory_pool: typing.List[str] = dataclasses.field(default_factory=list) pools: typing.List[libvirt.virStoragePool] = dataclasses.field( default_factory=list ) def get_network_info( xml_doc: Document, ) -> typing.Dict[str, str]: """returns the network info""" result_dict = {} interface_types = xml_doc.getElementsByTagName("interface") for interface_type in interface_types: interface_nodes = interface_type.childNodes for interface_node in interface_nodes: if interface_node.nodeName[0:1] != "#": for attr in interface_node.attributes.keys(): result_dict[ interface_node.attributes[attr].name ] = interface_node.attributes[attr].value return result_dict def get_arp_table() -> typing.Dict[str, str]: """Get the ARP table. return a dict with MAC/IP as key/value pair.""" result: typing.Dict[str, str] = {} with open("/proc/net/arp", encoding="utf-8") as arp_table: reader = csv.reader(arp_table, skipinitialspace=True, delimiter=" ") for arp_entry in reader: result[arp_entry[3]] = arp_entry[0] return result def get_ip_from_arp_table(arp_table: typing.Dict[str, str], mac: str) -> str: """get IP from MAC address using the arp table""" try: return arp_table[mac] except KeyError: return "N/A" def get_disk_info( xml_doc: Document, ) -> typing.Dict[str, str]: """returns the disk info""" result_dict: typing.Dict = {} disk_types = xml_doc.getElementsByTagName("disk") for disk_type in disk_types: disk_nodes = disk_type.childNodes for disk_node in disk_nodes: if disk_node.nodeName[0:1] != "#": for attr in disk_node.attributes.keys(): result_dict[ disk_node.attributes[attr].name ] = disk_node.attributes[attr].value return result_dict # pylint: disable=too-many-locals def ffs( offset: int, header_list: typing.Optional[typing.List[str]], numbered: bool, *args, ) -> typing.List[str]: """A simple columnar printer""" max_column_width = [] lines = [] numbers_f: typing.List[int] = [] dummy = [] if sys.stdout.isatty(): greenie = Colors.greenie bold = Colors.BOLD endc = Colors.ENDC goo = Colors.goo blueblue = Colors.blueblue else: greenie = "" bold = "" endc = "" goo = "" blueblue = "" for arg in args: max_column_width.append(max(len(repr(argette)) for argette in arg)) if header_list is not None: if numbered: numbers_f.extend(range(1, len(args[-1]) + 1)) max_column_width.append( max(len(repr(number)) for number in numbers_f) ) header_list.insert(0, "idx") index = range(0, len(header_list)) for header, width, i in zip(header_list, max_column_width, index): max_column_width[i] = max(len(header), width) + offset for i in index: dummy.append( greenie + bold + header_list[i].ljust(max_column_width[i]) + endc ) lines.append("".join(dummy)) dummy.clear() index2 = range(0, len(args[-1])) for i in index2: if numbered: dummy.append( goo + bold + repr(i).ljust(max_column_width[0]) + endc ) for arg, width in zip(args, max_column_width[1:]): dummy.append(blueblue + (arg[i]).ljust(width) + endc) else: for arg, width in zip(args, max_column_width): dummy.append(blueblue + (arg[i]).ljust(width) + endc) lines.append("".join(dummy)) dummy.clear() return lines def size_abr(num: float, shift_by: float) -> str: """Rounds and abbreviates floats.""" num = num * shift_by if num < 1000: return repr(num) if num < 1_000_000.0: return repr(round(num / 1_000, 2)) + " KB" if num < 1_000_000_000: return repr(round(num / 1_000_000, 2)) + " MB" if num < 1_000_000_000_000: return repr(round(num / 1_000_000_000, 2)) + " GB" return "N/A" # pylint: disable=too-many-locals def fill_virt_data_uri( conn: libvirt.virConnect, active_hosts: typing.List[int], virt_data: VirtData, arp_table: typing.Dict[str, str], ) -> None: """fill VirtData for one URI.""" for host_id in active_hosts: virt_data.uri.append(conn.getURI()) dom = conn.lookupByID(host_id) virt_data.snapshot_counts.append(repr(dom.snapshotNum())) virt_data.cpu_times.append( repr( int(dom.getCPUStats(total=True)[0]["cpu_time"] / 1_000_000_000) ) + "s" ) xml_doc = minidom.parseString(dom.XMLDesc()) virt_data.name.append(dom.name()) mem_stats = dom.memoryStats() if "actual" in mem_stats: virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000)) else: virt_data.mem_actual.append("n/a") # BSD guests dont support mem balloons? try: virt_data.mem_unused.append(size_abr(mem_stats["available"], 1000)) except KeyError: virt_data.mem_unused.append("N/A") tree = ElementTree.fromstring(dom.XMLDesc()) iface = tree.find("devices/interface/target").get("dev") stats = dom.interfaceStats(iface) virt_data.write_bytes.append(size_abr(stats[4], 1)) virt_data.read_bytes.append(size_abr(stats[0], 1)) found_the_pool: bool = False disk = tree.find("devices/disk/source").get("file") for pool in virt_data.pools: if os.path.basename(disk) in pool.listVolumes(): virt_data.memory_pool.append(pool.name()) found_the_pool = True # you could delete the pool but keep the volumes inside # which results in a functional VM but it wont have a # volume inside a pool that we can detect if not found_the_pool: virt_data.memory_pool.append("N/A") disk_info = get_disk_info(xml_doc) image_name = disk_info["file"] _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name) virt_data.disk_reads.append(size_abr(rd_bytes, 1)) virt_data.disk_writes.append(size_abr(wr_bytes, 1)) network_info = get_network_info(xml_doc) virt_data.macs.append(network_info["address"]) # virt_data.ips.append(get_ip_by_arp(network_info["address"])) # TODO-this is obviously not going to work for remote URIs virt_data.ips.append( get_ip_from_arp_table(arp_table, network_info["address"]) ) def main() -> None: """entrypoint""" signal.signal(signal.SIGINT, sig_handler_sigint) argparser = Argparser() print(Colors.screen_clear, end="") while True: virt_data = VirtData() arp_table = get_arp_table() for hv_host in argparser.args.uri: conn = libvirt.openReadOnly(hv_host) active_hosts = conn.listDomainsID() if len(active_hosts) > 0: virt_data.pools = conn.listAllStoragePools() # for pool in virt_data.pools: # print(pool.listVolumes()) # networks = conn.listAllNetworks() # print([pool.name() for pool in conn.listAllStoragePools()]) # print([net.name() for net in conn.listAllNetworks()]) virt_data.vm_id = [ repr(vm_id) for vm_id in conn.listDomainsID() ] fill_virt_data_uri(conn, active_hosts, virt_data, arp_table) # for conn_id in conn.listAllDomains(): # print(conn_id.name()) # print(conn_id.state()) else: print("no active VMs found.") sys.exit(1) lines = ffs( 2, [ "ID", "NAME", "CPU", "MEM_ACTUAL", "MEM_AVAIL", "NET_WRITE_B", "NET_READ_B", "MAC", "IP", "IO_READ_B", "IO_WRITE_B", "SNAPSHOTS", "URI", "STORAGE_POOL", ], False, virt_data.vm_id, virt_data.name, virt_data.cpu_times, virt_data.mem_actual, virt_data.mem_unused, virt_data.write_bytes, virt_data.read_bytes, virt_data.macs, virt_data.ips, virt_data.disk_reads, virt_data.disk_writes, virt_data.snapshot_counts, virt_data.uri, virt_data.memory_pool, ) for line in lines: print(line) time.sleep(argparser.args.delay) # clears the screen print(Colors.screen_clear, end="") print(Colors.hide_cursor, end="") if __name__ == "__main__": main()