aboutsummaryrefslogblamecommitdiffstats
path: root/bin/virttop
blob: ef62c20a6d2f6ff34835db18d6c9bdc08c15ecaa (plain) (tree)
1
2
3
4
5
6
7
8
9







                                                                 
                  
                 

             
             
                                    



                                                 









                                                          
                                             

                      







                                                                    






















                                            



















                                                                           

                                  
                                                               











                                                             
                                            

                                                         





                                   
                               




                                   

                               
                                           


                                
                        

                                                             
                                         



                          


                           

                                 
                                                     











                                                        
                                                                           


                                   
                                    



                                                     
                                                                               


                                    
                                                                           

























                                                                      
                                                                         



                                                          
                                                                         






                                                 
                                        


                        





                                                          


                

































                                                                               

                    


                                      

                                            





























                                                         




                                          
#!/usr/bin/env python
"""virt top"""

# ideally we would like to use the monkeypatch but it is untested
# and experimental
# import defusedxml  # type:ignore
# defusedxml.defuse_stdlib()
import argparse
import dataclasses
import subprocess

# import time
import typing
from xml.dom.minidom import Document
from defusedxml import ElementTree  # type:ignore
from defusedxml import minidom
import libvirt  # type:ignore


class Argparser:  # pylint: disable=too-few-public-methods
    """Argparser class."""

    def __init__(self):
        self.parser = argparse.ArgumentParser()
        self.parser.add_argument(
            "--delay",
            "-d",
            type=float,
            help="The delay between updates",
            default=5,
        )
        self.parser.add_argument(
            "--uri",
            "-u",
            nargs="+",
            type=str,
            help="A list of URIs to connect to seperated by commas",
            default=["qemu:///system"],
        )
        self.args = self.parser.parse_args()


# pylint: disable=too-few-public-methods
class Colors:
    """static color definitions"""

    purple = "\033[95m"
    blue = "\033[94m"
    green = "\033[92m"
    yellow = "\033[93m"
    red = "\033[91m"
    grey = "\033[1;37m"
    darkgrey = "\033[1;30m"
    cyan = "\033[1;36m"
    ENDC = "\033[0m"
    BOLD = "\033[1m"
    UNDERLINE = "\033[4m"
    blueblue = "\x1b[38;5;24m"
    greenie = "\x1b[38;5;23m"
    goo = "\x1b[38;5;22m"


@dataclasses.dataclass
# pylint: disable=too-many-instance-attributes
class VirtData:
    """Holds the data that we collect to display to the user"""

    name: typing.List[str] = dataclasses.field(default_factory=list)
    cpu_times: typing.List[str] = dataclasses.field(default_factory=list)
    mem_actual: typing.List[str] = dataclasses.field(default_factory=list)
    mem_unused: typing.List[str] = dataclasses.field(default_factory=list)
    write_bytes: typing.List[str] = dataclasses.field(default_factory=list)
    read_bytes: typing.List[str] = dataclasses.field(default_factory=list)
    macs: typing.List[str] = dataclasses.field(default_factory=list)
    ips: typing.List[str] = dataclasses.field(default_factory=list)
    disk_reads: typing.List[str] = dataclasses.field(default_factory=list)
    disk_writes: typing.List[str] = dataclasses.field(default_factory=list)


def get_network_info(
    xml_doc: Document,
) -> typing.Dict[str, str]:
    """returns the network info"""
    result_dict = {}
    interface_types = xml_doc.getElementsByTagName("interface")
    for interface_type in interface_types:
        interface_nodes = interface_type.childNodes
        for interface_node in interface_nodes:
            if interface_node.nodeName[0:1] != "#":
                for attr in interface_node.attributes.keys():
                    result_dict[
                        interface_node.attributes[attr].name
                    ] = interface_node.attributes[attr].value
    return result_dict


def get_ip_by_arp(mac: str) -> str:
    """Get ip adress using the arp table."""
    # arp -n | grep xx:xx:xx:xx:xx:xx | gawk '{print $1}'
    try:
        proc1 = subprocess.run(
            ["/usr/bin/arp", "-n"],
            capture_output=True,
            check=True,
            shell=False,
        )
        proc2 = subprocess.run(
            ["/usr/bin/grep", mac],
            input=proc1.stdout,
            capture_output=True,
            check=True,
            shell=False,
        )
        proc3 = subprocess.run(
            ["/usr/bin/awk", "{print $1}"],
            input=proc2.stdout,
            capture_output=True,
            check=True,
            shell=False,
        )
        ip_address = proc3.stdout.decode("utf-8").strip("\n")
    except subprocess.CalledProcessError:
        ip_address = "N/A"
    return ip_address


def get_disk_info(
    xml_doc: Document,
) -> typing.Dict[str, str]:
    """returns the disk info"""
    result_dict: typing.Dict = {}
    disk_types = xml_doc.getElementsByTagName("disk")
    for disk_type in disk_types:
        disk_nodes = disk_type.childNodes
        for disk_node in disk_nodes:
            if disk_node.nodeName[0:1] != "#":
                for attr in disk_node.attributes.keys():
                    result_dict[
                        disk_node.attributes[attr].name
                    ] = disk_node.attributes[attr].value

    return result_dict


def ffs(offset: int, header_list: typing.List[str], numbered: bool, *args):
    """A simple columnar printer"""
    max_column_width = []
    lines = []
    numbers_f: typing.List[int] = []
    dummy = []

    if numbered:
        numbers_f.extend(range(1, len(args[-1]) + 1))
        max_column_width.append(max(len(repr(number)) for number in numbers_f))
        header_list.insert(0, "idx")

    for arg in args:
        max_column_width.append(max(len(repr(argette)) for argette in arg))

    index = range(0, len(header_list))
    for header, width, i in zip(header_list, max_column_width, index):
        max_column_width[i] = max(len(header), width) + offset

    for i in index:
        dummy.append(
            Colors.greenie
            + Colors.BOLD
            + header_list[i].ljust(max_column_width[i])
            + Colors.ENDC
        )
    lines.append("".join(dummy))
    dummy.clear()

    index2 = range(0, len(args[-1]))
    for i in index2:
        if numbered:
            dummy.append(
                Colors.goo
                + Colors.BOLD
                + repr(i).ljust(max_column_width[0])
                + Colors.ENDC
            )
            for arg, width in zip(args, max_column_width[1:]):
                dummy.append(
                    Colors.blueblue + (arg[i]).ljust(width) + Colors.ENDC
                )
        else:
            for arg, width in zip(args, max_column_width):
                dummy.append(
                    Colors.blueblue + (arg[i]).ljust(width) + Colors.ENDC
                )
        lines.append("".join(dummy))
        dummy.clear()
    return lines


def size_abr(num: float, shift_by: float) -> str:
    """Rounds and abbreviates floats."""
    num = num * shift_by
    if num < 1000:
        return repr(num)
    if num < 1_000_000.0:
        return repr(round(num / 1_000, 2)) + " KB"
    if num < 1_000_000_000:
        return repr(round(num / 1_000_000, 2)) + " MB"
    if num < 1_000_000_000_000:
        return repr(round(num / 1_000_000_000, 2)) + " GB"
    return "N/A"


def fill_virt_data_uri(conn, active_hosts, virt_data: VirtData) -> None:
    """fill VirtData for one URI."""
    for host_id in active_hosts:
        dom = conn.lookupByID(host_id)
        virt_data.cpu_times.append(
            repr(
                int(dom.getCPUStats(total=True)[0]["cpu_time"] / 1_000_000_000)
            )
            + "s"
        )
        xml_doc = minidom.parseString(dom.XMLDesc())
        virt_data.name.append(dom.name())

        mem_stats = dom.memoryStats()
        virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000))
        virt_data.mem_unused.append(size_abr(mem_stats["unused"], 1000))

        tree = ElementTree.fromstring(dom.XMLDesc())
        iface = tree.find("devices/interface/target").get("dev")
        stats = dom.interfaceStats(iface)
        virt_data.write_bytes.append(size_abr(stats[4], 1))
        virt_data.read_bytes.append(size_abr(stats[0], 1))

        disk_info = get_disk_info(xml_doc)
        image_name = disk_info["file"]
        _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name)
        virt_data.disk_reads.append(size_abr(rd_bytes, 1))
        virt_data.disk_writes.append(size_abr(wr_bytes, 1))

        network_info = get_network_info(xml_doc)
        virt_data.macs.append(network_info["address"])
        virt_data.ips.append(get_ip_by_arp(network_info["address"]))


def main() -> None:
    """entrypoint"""
    argparser = Argparser()
    virt_data = VirtData()
    for hv_host in argparser.args.uri:
        conn = libvirt.openReadOnly(hv_host)
        active_hosts = conn.listDomainsID()
        fill_virt_data_uri(conn, active_hosts, virt_data)

    lines = ffs(
        3,
        [
            "name",
            "cpu",
            "mem_actual",
            "mem_unused",
            "net_write_B",
            "net_read_B",
            "MAC",
            "IP",
            "IO_read_B",
            "IO_write_B",
        ],
        True,
        virt_data.name,
        virt_data.cpu_times,
        virt_data.mem_actual,
        virt_data.mem_unused,
        virt_data.write_bytes,
        virt_data.read_bytes,
        virt_data.macs,
        virt_data.ips,
        virt_data.disk_reads,
        virt_data.disk_writes,
    )
    for line in lines:
        print(line)
        # time.sleep(argparser.args.delay)


if __name__ == "__main__":
    main()