aboutsummaryrefslogblamecommitdiffstats
path: root/bin/virttop
blob: 4310d5d033fbf67831e7f2e78fe0186375b914c0 (plain) (tree)
















































































































































































































































                                                                              
#!/usr/bin/env python
"""virt top"""

# ideally we would like to use the monkeypatch but it is untested
# and experimental
# import defusedxml  # type:ignore
# defusedxml.defuse_stdlib()
import argparse
import subprocess
import time
import typing
from defusedxml import ElementTree  # type:ignore
from defusedxml import minidom
import libvirt  # type:ignore

hv = ["qemu:///system"]


class Argparser:  # pylint: disable=too-few-public-methods
    """Argparser class."""

    def __init__(self):
        self.parser = argparse.ArgumentParser()
        self.parser.add_argument(
            "--delay",
            "-d",
            type=float,
            help="the delay between updates",
            default=5,
        )
        self.args = self.parser.parse_args()


# pylint: disable=too-few-public-methods
class Colors:
    """static color definitions"""

    purple = "\033[95m"
    blue = "\033[94m"
    green = "\033[92m"
    yellow = "\033[93m"
    red = "\033[91m"
    grey = "\033[1;37m"
    darkgrey = "\033[1;30m"
    cyan = "\033[1;36m"
    ENDC = "\033[0m"
    BOLD = "\033[1m"
    UNDERLINE = "\033[4m"
    blueblue = "\x1b[38;5;24m"
    greenie = "\x1b[38;5;23m"
    goo = "\x1b[38;5;22m"


def get_network_info(xml) -> typing.Dict[str, str]:
    """returns the network info"""
    result_dict = {}
    interface_types = xml.getElementsByTagName("interface")
    for interface_type in interface_types:
        interface_nodes = interface_type.childNodes
        for interface_node in interface_nodes:
            if interface_node.nodeName[0:1] != "#":
                for attr in interface_node.attributes.keys():
                    result_dict[
                        interface_node.attributes[attr].name
                    ] = interface_node.attributes[attr].value
    return result_dict


def get_ip_by_arp(mac: str) -> str:
    """get ip adress using arp"""
    # arp -n | grep xx:xx:xx:xx:xx:xx | gawk '{print $1}'
    try:
        proc1 = subprocess.run(["arp", "-n"], capture_output=True, check=True)
        proc2 = subprocess.run(
            ["grep", mac], input=proc1.stdout, capture_output=True, check=True
        )
        proc3 = subprocess.run(
            ["awk", "{print $1}"],
            input=proc2.stdout,
            capture_output=True,
            check=True,
        )
        ip_address = proc3.stdout.decode("utf-8").strip("\n")
    except:
        ip_address = "N/A"
    return ip_address


def get_disk_info(xml) -> typing.Dict[str, str]:
    """returns the disk info"""
    result_dict: typing.Dict = {}
    disk_types = xml.getElementsByTagName("disk")
    for disk_type in disk_types:
        disk_nodes = disk_type.childNodes
        for disk_node in disk_nodes:
            if disk_node.nodeName[0:1] != "#":
                for attr in disk_node.attributes.keys():
                    result_dict[
                        disk_node.attributes[attr].name
                    ] = disk_node.attributes[attr].value

    return result_dict


def ffs(offset, header_list, numbered, *args):
    """A simple columnar printer"""
    max_column_width = []
    lines = []
    numbers_f = []
    dummy = []

    if numbered:
        numbers_f.extend(range(1, len(args[-1]) + 1))
        max_column_width.append(
            max([len(repr(number)) for number in numbers_f])
        )
        header_list.insert(0, "idx")

    for arg in args:
        max_column_width.append(max([len(repr(argette)) for argette in arg]))

    index = range(0, len(header_list))
    for header, width, i in zip(header_list, max_column_width, index):
        max_column_width[i] = max(len(header), width) + offset

    for i in index:
        dummy.append(
            Colors.greenie
            + Colors.BOLD
            + header_list[i].ljust(max_column_width[i])
            + Colors.ENDC
        )
    lines.append("".join(dummy))
    dummy.clear()

    index2 = range(0, len(args[-1]))
    for i in index2:
        if numbered:
            dummy.append(
                Colors.goo
                + Colors.BOLD
                + repr(i).ljust(max_column_width[0])
                + Colors.ENDC
            )
            for arg, width in zip(args, max_column_width[1:]):
                dummy.append(
                    Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC
                )
        else:
            for arg, width in zip(args, max_column_width):
                dummy.append(
                    Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC
                )
        lines.append("".join(dummy))
        dummy.clear()
    return lines


def size_abr(num: float, shift_by: float) -> str:
    num = num * shift_by
    if num < 1000:
        return repr(num)
    elif num < 1_000_000.0:
        return repr(num / 1_000) + " KB"
    elif num < 1_000_000_000:
        return repr(num / 1_000_000) + " MB"
    elif num < 1_000_000_000_000:
        return repr(num / 1_000_000_000) + " GB"
    return "N/A"


def main() -> None:
    """entrypoint"""
    # argparser = Argparser()
    name = []
    cpu_times = []
    mem_actual = []
    mem_unused = []
    write_bytes = []
    read_bytes = []
    macs = []
    ips = []
    for hv_host in hv:
        conn = libvirt.openReadOnly(hv_host)
        active_hosts = conn.listDomainsID()
        for host_id in active_hosts:
            dom = conn.lookupByID(host_id)
            cpu_times.append(
                repr(
                    int(
                        dom.getCPUStats(total=True)[0]["cpu_time"]
                        / 1_000_000_000
                    )
                )
                + " s"
            )
            raw_xml = dom.XMLDesc()
            xml = minidom.parseString(raw_xml)
            name.append(dom.name())
            mem_stats = dom.memoryStats()
            mem_actual.append(size_abr(mem_stats["actual"], 1000))
            mem_unused.append(size_abr(mem_stats["unused"], 1000))
            tree = ElementTree.fromstring(dom.XMLDesc())
            iface = tree.find("devices/interface/target").get("dev")
            stats = dom.interfaceStats(iface)
            write_bytes.append(size_abr(stats[4], 1))
            read_bytes.append(size_abr(stats[0], 1))
            get_disk_info(xml)
            network_info = get_network_info(xml)
            macs.append(network_info["address"])
            ips.append(get_ip_by_arp(network_info["address"]))

        lines = ffs(
            3,
            [
                "name",
                "cpu",
                "mem_actual",
                "mem_unused",
                "write_B",
                "read_B",
                "MAC",
                "IP",
            ],
            True,
            name,
            cpu_times,
            mem_actual,
            mem_unused,
            write_bytes,
            read_bytes,
            macs,
            ips,
        )
        for line in lines:
            print(line)
        # time.sleep(argparser.args.delay)


if __name__ == "__main__":
    main()