diff options
Diffstat (limited to '')
| -rwxr-xr-x | bin/virttop | 376 | 
1 files changed, 0 insertions, 376 deletions
| diff --git a/bin/virttop b/bin/virttop deleted file mode 100755 index 4d8943b..0000000 --- a/bin/virttop +++ /dev/null @@ -1,376 +0,0 @@ -#!/usr/bin/env python -"""virt top""" - -# ideally we would like to use the monkeypatch but it is untested -# and experimental -# import defusedxml  # type:ignore -# defusedxml.defuse_stdlib() -import argparse -import csv -import dataclasses -import enum -import os -import signal -import sys -import time -import typing - -# we are only using this for type annotation -from xml.dom.minidom import Document  # nosec - -from defusedxml import ElementTree  # type:ignore -from defusedxml import minidom -import libvirt  # type:ignore - - -# pylint: disable=unused-argument -def sig_handler_sigint(signum, frame): -    """Just to handle C-c gracefully""" -    print() -    sys.exit(0) - - -class Argparser:  # pylint: disable=too-few-public-methods -    """Argparser class.""" - -    def __init__(self): -        self.parser = argparse.ArgumentParser() -        self.parser.add_argument( -            "--delay", -            "-d", -            type=float, -            help="The delay between updates", -            default=5, -        ) -        self.parser.add_argument( -            "--uri", -            "-u", -            nargs="+", -            type=str, -            help="A list of URIs to connect to seperated by commas", -            default=["qemu:///system"], -        ) -        self.args = self.parser.parse_args() - - -# pylint: disable=too-few-public-methods -class Colors(enum.EnumType): -    """static color definitions""" - -    purple = "\033[95m" -    blue = "\033[94m" -    green = "\033[92m" -    yellow = "\033[93m" -    red = "\033[91m" -    grey = "\033[1;37m" -    darkgrey = "\033[1;30m" -    cyan = "\033[1;36m" -    ENDC = "\033[0m" -    BOLD = "\033[1m" -    UNDERLINE = "\033[4m" -    blueblue = "\x1b[38;5;24m" -    greenie = "\x1b[38;5;23m" -    goo = "\x1b[38;5;22m" -    screen_clear = "\033c\033[3J" -    hide_cursor = "\033[?25l" - - -@dataclasses.dataclass -# pylint: disable=too-many-instance-attributes -class VirtData: -    """Holds the data that we collect to display to the user""" - -    vm_id: typing.List[str] = dataclasses.field(default_factory=list) -    name: typing.List[str] = dataclasses.field(default_factory=list) -    cpu_times: typing.List[str] = dataclasses.field(default_factory=list) -    mem_actual: typing.List[str] = dataclasses.field(default_factory=list) -    mem_unused: typing.List[str] = dataclasses.field(default_factory=list) -    write_bytes: typing.List[str] = dataclasses.field(default_factory=list) -    read_bytes: typing.List[str] = dataclasses.field(default_factory=list) -    macs: typing.List[str] = dataclasses.field(default_factory=list) -    ips: typing.List[str] = dataclasses.field(default_factory=list) -    disk_reads: typing.List[str] = dataclasses.field(default_factory=list) -    disk_writes: typing.List[str] = dataclasses.field(default_factory=list) -    snapshot_counts: typing.List[str] = dataclasses.field(default_factory=list) -    uri: typing.List[str] = dataclasses.field(default_factory=list) -    memory_pool: typing.List[str] = dataclasses.field(default_factory=list) - -    pools: typing.List[libvirt.virStoragePool] = dataclasses.field( -        default_factory=list -    ) - - -def get_network_info( -    xml_doc: Document, -) -> typing.Dict[str, str]: -    """returns the network info""" -    result_dict = {} -    interface_types = xml_doc.getElementsByTagName("interface") -    for interface_type in interface_types: -        interface_nodes = interface_type.childNodes -        for interface_node in interface_nodes: -            if interface_node.nodeName[0:1] != "#": -                for attr in interface_node.attributes.keys(): -                    result_dict[ -                        interface_node.attributes[attr].name -                    ] = interface_node.attributes[attr].value -    return result_dict - - -def get_arp_table() -> typing.Dict[str, str]: -    """Get the ARP table. return a dict with MAC/IP as key/value pair.""" -    result: typing.Dict[str, str] = {} -    with open("/proc/net/arp", encoding="utf-8") as arp_table: -        reader = csv.reader(arp_table, skipinitialspace=True, delimiter=" ") -        for arp_entry in reader: -            result[arp_entry[3]] = arp_entry[0] - -    return result - - -def get_ip_from_arp_table(arp_table: typing.Dict[str, str], mac: str) -> str: -    """get IP from MAC address using the arp table""" -    try: -        return arp_table[mac] -    except KeyError: -        return "N/A" - - -def get_disk_info( -    xml_doc: Document, -) -> typing.Dict[str, str]: -    """returns the disk info""" -    result_dict: typing.Dict = {} -    disk_types = xml_doc.getElementsByTagName("disk") -    for disk_type in disk_types: -        disk_nodes = disk_type.childNodes -        for disk_node in disk_nodes: -            if disk_node.nodeName[0:1] != "#": -                for attr in disk_node.attributes.keys(): -                    result_dict[ -                        disk_node.attributes[attr].name -                    ] = disk_node.attributes[attr].value - -    return result_dict - - -# pylint: disable=too-many-locals -def ffs( -    offset: int, -    header_list: typing.Optional[typing.List[str]], -    numbered: bool, -    *args, -) -> typing.List[str]: -    """A simple columnar printer""" -    max_column_width = [] -    lines = [] -    numbers_f: typing.List[int] = [] -    dummy = [] - -    if sys.stdout.isatty(): -        greenie = Colors.greenie -        bold = Colors.BOLD -        endc = Colors.ENDC -        goo = Colors.goo -        blueblue = Colors.blueblue -    else: -        greenie = "" -        bold = "" -        endc = "" -        goo = "" -        blueblue = "" - -    for arg in args: -        max_column_width.append(max(len(repr(argette)) for argette in arg)) - -    if header_list is not None: -        if numbered: -            numbers_f.extend(range(1, len(args[-1]) + 1)) -            max_column_width.append( -                max(len(repr(number)) for number in numbers_f) -            ) -            header_list.insert(0, "idx") - -        index = range(0, len(header_list)) -        for header, width, i in zip(header_list, max_column_width, index): -            max_column_width[i] = max(len(header), width) + offset - -        for i in index: -            dummy.append( -                greenie -                + bold -                + header_list[i].ljust(max_column_width[i]) -                + endc -            ) -        lines.append("".join(dummy)) -        dummy.clear() - -    index2 = range(0, len(args[-1])) -    for i in index2: -        if numbered: -            dummy.append( -                goo + bold + repr(i).ljust(max_column_width[0]) + endc -            ) -            for arg, width in zip(args, max_column_width[1:]): -                dummy.append(blueblue + (arg[i]).ljust(width) + endc) -        else: -            for arg, width in zip(args, max_column_width): -                dummy.append(blueblue + (arg[i]).ljust(width) + endc) -        lines.append("".join(dummy)) -        dummy.clear() -    return lines - - -def size_abr(num: float, shift_by: float) -> str: -    """Rounds and abbreviates floats.""" -    num = num * shift_by -    if num < 1000: -        return repr(num) -    if num < 1_000_000.0: -        return repr(round(num / 1_000, 2)) + " KB" -    if num < 1_000_000_000: -        return repr(round(num / 1_000_000, 2)) + " MB" -    if num < 1_000_000_000_000: -        return repr(round(num / 1_000_000_000, 2)) + " GB" -    return "N/A" - - -# pylint: disable=too-many-locals -def fill_virt_data_uri( -    conn: libvirt.virConnect, -    active_hosts: typing.List[int], -    virt_data: VirtData, -    arp_table: typing.Dict[str, str], -) -> None: -    """fill VirtData for one URI.""" -    for host_id in active_hosts: -        virt_data.uri.append(conn.getURI()) -        dom = conn.lookupByID(host_id) -        virt_data.snapshot_counts.append(repr(dom.snapshotNum())) -        virt_data.cpu_times.append( -            repr( -                int(dom.getCPUStats(total=True)[0]["cpu_time"] / 1_000_000_000) -            ) -            + "s" -        ) -        xml_doc = minidom.parseString(dom.XMLDesc()) -        virt_data.name.append(dom.name()) - -        mem_stats = dom.memoryStats() -        if "actual" in mem_stats: -            virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000)) -        else: -            virt_data.mem_actual.append("n/a") - -        # BSD guests dont support mem balloons? -        try: -            virt_data.mem_unused.append(size_abr(mem_stats["available"], 1000)) -        except KeyError: -            virt_data.mem_unused.append("N/A") - -        tree = ElementTree.fromstring(dom.XMLDesc()) -        iface = tree.find("devices/interface/target").get("dev") -        stats = dom.interfaceStats(iface) -        virt_data.write_bytes.append(size_abr(stats[4], 1)) -        virt_data.read_bytes.append(size_abr(stats[0], 1)) - -        found_the_pool: bool = False -        disk = tree.find("devices/disk/source").get("file") -        for pool in virt_data.pools: -            if os.path.basename(disk) in pool.listVolumes(): -                virt_data.memory_pool.append(pool.name()) -                found_the_pool = True -        # you could delete the pool but keep the volumes inside -        # which results in a functional VM but it wont have a -        # volume inside a pool that we can detect -        if not found_the_pool: -            virt_data.memory_pool.append("N/A") - -        disk_info = get_disk_info(xml_doc) -        image_name = disk_info["file"] -        _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name) -        virt_data.disk_reads.append(size_abr(rd_bytes, 1)) -        virt_data.disk_writes.append(size_abr(wr_bytes, 1)) - -        network_info = get_network_info(xml_doc) -        virt_data.macs.append(network_info["address"]) -        # virt_data.ips.append(get_ip_by_arp(network_info["address"])) -        # TODO-this is obviously not going to work for remote URIs -        virt_data.ips.append( -            get_ip_from_arp_table(arp_table, network_info["address"]) -        ) - - -def main() -> None: -    """entrypoint""" -    signal.signal(signal.SIGINT, sig_handler_sigint) -    argparser = Argparser() -    print(Colors.screen_clear, end="") -    while True: -        virt_data = VirtData() -        arp_table = get_arp_table() -        for hv_host in argparser.args.uri: -            conn = libvirt.openReadOnly(hv_host) -            active_hosts = conn.listDomainsID() -            if len(active_hosts) > 0: -                virt_data.pools = conn.listAllStoragePools() -                # for pool in virt_data.pools: -                #     print(pool.listVolumes()) -                # networks = conn.listAllNetworks() -                # print([pool.name() for pool in conn.listAllStoragePools()]) -                # print([net.name() for net in conn.listAllNetworks()]) -                virt_data.vm_id = [ -                    repr(vm_id) for vm_id in conn.listDomainsID() -                ] -                fill_virt_data_uri(conn, active_hosts, virt_data, arp_table) -                # for conn_id in conn.listAllDomains(): -                #     print(conn_id.name()) -                #     print(conn_id.state()) -            else: -                print("no active VMs found.") -                sys.exit(1) - -        lines = ffs( -            2, -            [ -                "ID", -                "NAME", -                "CPU", -                "MEM_ACTUAL", -                "MEM_AVAIL", -                "NET_WRITE_B", -                "NET_READ_B", -                "MAC", -                "IP", -                "IO_READ_B", -                "IO_WRITE_B", -                "SNAPSHOTS", -                "URI", -                "STORAGE_POOL", -            ], -            False, -            virt_data.vm_id, -            virt_data.name, -            virt_data.cpu_times, -            virt_data.mem_actual, -            virt_data.mem_unused, -            virt_data.write_bytes, -            virt_data.read_bytes, -            virt_data.macs, -            virt_data.ips, -            virt_data.disk_reads, -            virt_data.disk_writes, -            virt_data.snapshot_counts, -            virt_data.uri, -            virt_data.memory_pool, -        ) -        for line in lines: -            print(line) -        time.sleep(argparser.args.delay) -        # clears the screen -        print(Colors.screen_clear, end="") -        print(Colors.hide_cursor, end="") - - -if __name__ == "__main__": -    main() | 
