diff options
Diffstat (limited to 'bin/virttop')
| -rwxr-xr-x | bin/virttop | 241 | 
1 files changed, 241 insertions, 0 deletions
| diff --git a/bin/virttop b/bin/virttop new file mode 100755 index 0000000..4310d5d --- /dev/null +++ b/bin/virttop @@ -0,0 +1,241 @@ +#!/usr/bin/env python +"""virt top""" + +# ideally we would like to use the monkeypatch but it is untested +# and experimental +# import defusedxml  # type:ignore +# defusedxml.defuse_stdlib() +import argparse +import subprocess +import time +import typing +from defusedxml import ElementTree  # type:ignore +from defusedxml import minidom +import libvirt  # type:ignore + +hv = ["qemu:///system"] + + +class Argparser:  # pylint: disable=too-few-public-methods +    """Argparser class.""" + +    def __init__(self): +        self.parser = argparse.ArgumentParser() +        self.parser.add_argument( +            "--delay", +            "-d", +            type=float, +            help="the delay between updates", +            default=5, +        ) +        self.args = self.parser.parse_args() + + +# pylint: disable=too-few-public-methods +class Colors: +    """static color definitions""" + +    purple = "\033[95m" +    blue = "\033[94m" +    green = "\033[92m" +    yellow = "\033[93m" +    red = "\033[91m" +    grey = "\033[1;37m" +    darkgrey = "\033[1;30m" +    cyan = "\033[1;36m" +    ENDC = "\033[0m" +    BOLD = "\033[1m" +    UNDERLINE = "\033[4m" +    blueblue = "\x1b[38;5;24m" +    greenie = "\x1b[38;5;23m" +    goo = "\x1b[38;5;22m" + + +def get_network_info(xml) -> typing.Dict[str, str]: +    """returns the network info""" +    result_dict = {} +    interface_types = xml.getElementsByTagName("interface") +    for interface_type in interface_types: +        interface_nodes = interface_type.childNodes +        for interface_node in interface_nodes: +            if interface_node.nodeName[0:1] != "#": +                for attr in interface_node.attributes.keys(): +                    result_dict[ +                        interface_node.attributes[attr].name +                    ] = interface_node.attributes[attr].value +    return result_dict + + +def get_ip_by_arp(mac: str) -> str: +    """get ip adress using arp""" +    # arp -n | grep xx:xx:xx:xx:xx:xx | gawk '{print $1}' +    try: +        proc1 = subprocess.run(["arp", "-n"], capture_output=True, check=True) +        proc2 = subprocess.run( +            ["grep", mac], input=proc1.stdout, capture_output=True, check=True +        ) +        proc3 = subprocess.run( +            ["awk", "{print $1}"], +            input=proc2.stdout, +            capture_output=True, +            check=True, +        ) +        ip_address = proc3.stdout.decode("utf-8").strip("\n") +    except: +        ip_address = "N/A" +    return ip_address + + +def get_disk_info(xml) -> typing.Dict[str, str]: +    """returns the disk info""" +    result_dict: typing.Dict = {} +    disk_types = xml.getElementsByTagName("disk") +    for disk_type in disk_types: +        disk_nodes = disk_type.childNodes +        for disk_node in disk_nodes: +            if disk_node.nodeName[0:1] != "#": +                for attr in disk_node.attributes.keys(): +                    result_dict[ +                        disk_node.attributes[attr].name +                    ] = disk_node.attributes[attr].value + +    return result_dict + + +def ffs(offset, header_list, numbered, *args): +    """A simple columnar printer""" +    max_column_width = [] +    lines = [] +    numbers_f = [] +    dummy = [] + +    if numbered: +        numbers_f.extend(range(1, len(args[-1]) + 1)) +        max_column_width.append( +            max([len(repr(number)) for number in numbers_f]) +        ) +        header_list.insert(0, "idx") + +    for arg in args: +        max_column_width.append(max([len(repr(argette)) for argette in arg])) + +    index = range(0, len(header_list)) +    for header, width, i in zip(header_list, max_column_width, index): +        max_column_width[i] = max(len(header), width) + offset + +    for i in index: +        dummy.append( +            Colors.greenie +            + Colors.BOLD +            + header_list[i].ljust(max_column_width[i]) +            + Colors.ENDC +        ) +    lines.append("".join(dummy)) +    dummy.clear() + +    index2 = range(0, len(args[-1])) +    for i in index2: +        if numbered: +            dummy.append( +                Colors.goo +                + Colors.BOLD +                + repr(i).ljust(max_column_width[0]) +                + Colors.ENDC +            ) +            for arg, width in zip(args, max_column_width[1:]): +                dummy.append( +                    Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC +                ) +        else: +            for arg, width in zip(args, max_column_width): +                dummy.append( +                    Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC +                ) +        lines.append("".join(dummy)) +        dummy.clear() +    return lines + + +def size_abr(num: float, shift_by: float) -> str: +    num = num * shift_by +    if num < 1000: +        return repr(num) +    elif num < 1_000_000.0: +        return repr(num / 1_000) + " KB" +    elif num < 1_000_000_000: +        return repr(num / 1_000_000) + " MB" +    elif num < 1_000_000_000_000: +        return repr(num / 1_000_000_000) + " GB" +    return "N/A" + + +def main() -> None: +    """entrypoint""" +    # argparser = Argparser() +    name = [] +    cpu_times = [] +    mem_actual = [] +    mem_unused = [] +    write_bytes = [] +    read_bytes = [] +    macs = [] +    ips = [] +    for hv_host in hv: +        conn = libvirt.openReadOnly(hv_host) +        active_hosts = conn.listDomainsID() +        for host_id in active_hosts: +            dom = conn.lookupByID(host_id) +            cpu_times.append( +                repr( +                    int( +                        dom.getCPUStats(total=True)[0]["cpu_time"] +                        / 1_000_000_000 +                    ) +                ) +                + " s" +            ) +            raw_xml = dom.XMLDesc() +            xml = minidom.parseString(raw_xml) +            name.append(dom.name()) +            mem_stats = dom.memoryStats() +            mem_actual.append(size_abr(mem_stats["actual"], 1000)) +            mem_unused.append(size_abr(mem_stats["unused"], 1000)) +            tree = ElementTree.fromstring(dom.XMLDesc()) +            iface = tree.find("devices/interface/target").get("dev") +            stats = dom.interfaceStats(iface) +            write_bytes.append(size_abr(stats[4], 1)) +            read_bytes.append(size_abr(stats[0], 1)) +            get_disk_info(xml) +            network_info = get_network_info(xml) +            macs.append(network_info["address"]) +            ips.append(get_ip_by_arp(network_info["address"])) + +        lines = ffs( +            3, +            [ +                "name", +                "cpu", +                "mem_actual", +                "mem_unused", +                "write_B", +                "read_B", +                "MAC", +                "IP", +            ], +            True, +            name, +            cpu_times, +            mem_actual, +            mem_unused, +            write_bytes, +            read_bytes, +            macs, +            ips, +        ) +        for line in lines: +            print(line) +        # time.sleep(argparser.args.delay) + + +if __name__ == "__main__": +    main() | 
