aboutsummaryrefslogblamecommitdiffstats
path: root/bin/virttop
blob: be67a48e7eb3d2b0a4bbf0b40ea17498ca110199 (plain) (tree)
1
2
3
4
5
6
7
8
9







                                                                 
          
                  
           
             
          
           
             
                                    
 



                                                 
 






                                       








                                                          
                                             

                      







                                                                    



                                            
                            















                                  

                                 

 




                                                               
                                                                     









                                                                           

                                                                               




                           

                                  
                                                               










                                                             












                                                                             
        


                             

 


                           

                                 
                                                     











                                                        






                                                   


                                   
                                    

              











                                  

                    
                                                                           
 






                                                              
 












                                                                          




                                    
                                                                      

                                                              
                                                                     

                                                          
                                                                     





                                                 
                                        


                        





                                                          


                






                                     

                                    
                                           
                                      
                                                                 










                                                                               





                                                                            














                                                                




                                                                      

 

                    
                                                    
                           




























































                                                                             



                          
#!/usr/bin/env python
"""virt top"""

# ideally we would like to use the monkeypatch but it is untested
# and experimental
# import defusedxml  # type:ignore
# defusedxml.defuse_stdlib()
import argparse
import csv
import dataclasses
import enum
import signal
import sys
import time
import typing
from xml.dom.minidom import Document

from defusedxml import ElementTree  # type:ignore
from defusedxml import minidom
import libvirt  # type:ignore


# pylint: disable=unused-argument
def sig_handler_sigint(signum, frame):
    """Just to handle C-c gracefully"""
    print()
    sys.exit(0)


class Argparser:  # pylint: disable=too-few-public-methods
    """Argparser class."""

    def __init__(self):
        self.parser = argparse.ArgumentParser()
        self.parser.add_argument(
            "--delay",
            "-d",
            type=float,
            help="The delay between updates",
            default=5,
        )
        self.parser.add_argument(
            "--uri",
            "-u",
            nargs="+",
            type=str,
            help="A list of URIs to connect to seperated by commas",
            default=["qemu:///system"],
        )
        self.args = self.parser.parse_args()


# pylint: disable=too-few-public-methods
class Colors(enum.EnumType):
    """static color definitions"""

    purple = "\033[95m"
    blue = "\033[94m"
    green = "\033[92m"
    yellow = "\033[93m"
    red = "\033[91m"
    grey = "\033[1;37m"
    darkgrey = "\033[1;30m"
    cyan = "\033[1;36m"
    ENDC = "\033[0m"
    BOLD = "\033[1m"
    UNDERLINE = "\033[4m"
    blueblue = "\x1b[38;5;24m"
    greenie = "\x1b[38;5;23m"
    goo = "\x1b[38;5;22m"
    screen_clear = "\033c\033[3J"
    hide_cursor = "\033[?25l"


@dataclasses.dataclass
# pylint: disable=too-many-instance-attributes
class VirtData:
    """Holds the data that we collect to display to the user"""

    vm_id: typing.List[str] = dataclasses.field(default_factory=list)
    name: typing.List[str] = dataclasses.field(default_factory=list)
    cpu_times: typing.List[str] = dataclasses.field(default_factory=list)
    mem_actual: typing.List[str] = dataclasses.field(default_factory=list)
    mem_unused: typing.List[str] = dataclasses.field(default_factory=list)
    write_bytes: typing.List[str] = dataclasses.field(default_factory=list)
    read_bytes: typing.List[str] = dataclasses.field(default_factory=list)
    macs: typing.List[str] = dataclasses.field(default_factory=list)
    ips: typing.List[str] = dataclasses.field(default_factory=list)
    disk_reads: typing.List[str] = dataclasses.field(default_factory=list)
    disk_writes: typing.List[str] = dataclasses.field(default_factory=list)
    snapshot_counts: typing.List[str] = dataclasses.field(default_factory=list)
    uri: typing.List[str] = dataclasses.field(default_factory=list)


def get_network_info(
    xml_doc: Document,
) -> typing.Dict[str, str]:
    """returns the network info"""
    result_dict = {}
    interface_types = xml_doc.getElementsByTagName("interface")
    for interface_type in interface_types:
        interface_nodes = interface_type.childNodes
        for interface_node in interface_nodes:
            if interface_node.nodeName[0:1] != "#":
                for attr in interface_node.attributes.keys():
                    result_dict[
                        interface_node.attributes[attr].name
                    ] = interface_node.attributes[attr].value
    return result_dict


def get_arp_table() -> typing.Dict[str, str]:
    """Get the ARP table. return a dict with MAC/IP as key/value pair."""
    result: typing.Dict[str, str] = {}
    with open("/proc/net/arp", encoding="utf-8") as arp_table:
        reader = csv.reader(arp_table, skipinitialspace=True, delimiter=" ")
        for arp_entry in reader:
            result[arp_entry[3]] = arp_entry[0]

    return result


def get_ip_from_arp_table(arp_table: typing.Dict[str, str], mac: str) -> str:
    """get IP from MAC address using the arp table"""
    try:
        return arp_table[mac]
    except KeyError:
        return "N/A"


def get_disk_info(
    xml_doc: Document,
) -> typing.Dict[str, str]:
    """returns the disk info"""
    result_dict: typing.Dict = {}
    disk_types = xml_doc.getElementsByTagName("disk")
    for disk_type in disk_types:
        disk_nodes = disk_type.childNodes
        for disk_node in disk_nodes:
            if disk_node.nodeName[0:1] != "#":
                for attr in disk_node.attributes.keys():
                    result_dict[
                        disk_node.attributes[attr].name
                    ] = disk_node.attributes[attr].value

    return result_dict


# pylint: disable=too-many-locals
def ffs(
    offset: int,
    header_list: typing.Optional[typing.List[str]],
    numbered: bool,
    *args,
):
    """A simple columnar printer"""
    max_column_width = []
    lines = []
    numbers_f: typing.List[int] = []
    dummy = []

    if sys.stdout.isatty():
        greenie = Colors.greenie
        bold = Colors.BOLD
        endc = Colors.ENDC
        goo = Colors.goo
        blueblue = Colors.blueblue
    else:
        greenie = ""
        bold = ""
        endc = ""
        goo = ""
        blueblue = ""

    for arg in args:
        max_column_width.append(max(len(repr(argette)) for argette in arg))

    if header_list is not None:
        if numbered:
            numbers_f.extend(range(1, len(args[-1]) + 1))
            max_column_width.append(
                max(len(repr(number)) for number in numbers_f)
            )
            header_list.insert(0, "idx")

        index = range(0, len(header_list))
        for header, width, i in zip(header_list, max_column_width, index):
            max_column_width[i] = max(len(header), width) + offset

        for i in index:
            dummy.append(
                greenie
                + bold
                + header_list[i].ljust(max_column_width[i])
                + endc
            )
        lines.append("".join(dummy))
        dummy.clear()

    index2 = range(0, len(args[-1]))
    for i in index2:
        if numbered:
            dummy.append(
                goo + bold + repr(i).ljust(max_column_width[0]) + endc
            )
            for arg, width in zip(args, max_column_width[1:]):
                dummy.append(blueblue + (arg[i]).ljust(width) + endc)
        else:
            for arg, width in zip(args, max_column_width):
                dummy.append(blueblue + (arg[i]).ljust(width) + endc)
        lines.append("".join(dummy))
        dummy.clear()
    return lines


def size_abr(num: float, shift_by: float) -> str:
    """Rounds and abbreviates floats."""
    num = num * shift_by
    if num < 1000:
        return repr(num)
    if num < 1_000_000.0:
        return repr(round(num / 1_000, 2)) + " KB"
    if num < 1_000_000_000:
        return repr(round(num / 1_000_000, 2)) + " MB"
    if num < 1_000_000_000_000:
        return repr(round(num / 1_000_000_000, 2)) + " GB"
    return "N/A"


# pylint: disable=too-many-locals
def fill_virt_data_uri(
    conn: libvirt.virConnect,
    active_hosts: typing.List[int],
    virt_data: VirtData,
    arp_table: typing.Dict[str, str],
) -> None:
    """fill VirtData for one URI."""
    for host_id in active_hosts:
        virt_data.uri.append(conn.getURI())
        dom = conn.lookupByID(host_id)
        virt_data.snapshot_counts.append(repr(dom.snapshotNum()))
        virt_data.cpu_times.append(
            repr(
                int(dom.getCPUStats(total=True)[0]["cpu_time"] / 1_000_000_000)
            )
            + "s"
        )
        xml_doc = minidom.parseString(dom.XMLDesc())
        virt_data.name.append(dom.name())

        mem_stats = dom.memoryStats()
        virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000))

        # BSD guests dont have unused memory
        try:
            virt_data.mem_unused.append(size_abr(mem_stats["unused"], 1000))
        except KeyError:
            virt_data.mem_unused.append("N/A")

        tree = ElementTree.fromstring(dom.XMLDesc())
        iface = tree.find("devices/interface/target").get("dev")
        stats = dom.interfaceStats(iface)
        virt_data.write_bytes.append(size_abr(stats[4], 1))
        virt_data.read_bytes.append(size_abr(stats[0], 1))

        disk_info = get_disk_info(xml_doc)
        image_name = disk_info["file"]
        _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name)
        virt_data.disk_reads.append(size_abr(rd_bytes, 1))
        virt_data.disk_writes.append(size_abr(wr_bytes, 1))

        network_info = get_network_info(xml_doc)
        virt_data.macs.append(network_info["address"])
        # virt_data.ips.append(get_ip_by_arp(network_info["address"]))
        # TODO-this is obviously not going to work for remote URIs
        virt_data.ips.append(
            get_ip_from_arp_table(arp_table, network_info["address"])
        )


def main() -> None:
    """entrypoint"""
    signal.signal(signal.SIGINT, sig_handler_sigint)
    argparser = Argparser()
    print(Colors.screen_clear, end="")
    while True:
        virt_data = VirtData()
        arp_table = get_arp_table()
        for hv_host in argparser.args.uri:
            conn = libvirt.openReadOnly(hv_host)
            active_hosts = conn.listDomainsID()
            if len(active_hosts) > 0:
                # pools = conn.listAllStoragePools()
                # networks = conn.listAllNetworks()
                # print([pool.name() for pool in conn.listAllStoragePools()])
                # print([net.name() for net in conn.listAllNetworks()])
                virt_data.vm_id = [
                    repr(vm_id) for vm_id in conn.listDomainsID()
                ]
                fill_virt_data_uri(conn, active_hosts, virt_data, arp_table)
                # for conn_id in conn.listAllDomains():
                #     print(conn_id.name())
                #     print(conn_id.state())
            else:
                print("no active VMs found.")
                sys.exit(1)

        lines = ffs(
            2,
            [
                "ID",
                "NAME",
                "CPU",
                "MEM_ACTUAL",
                "MEM_UNUSED",
                "NET_WRITE_B",
                "NET_READ_B",
                "MAC",
                "IP",
                "IO_READ_B",
                "IO_WRITE_B",
                "SNAPSHOTS",
                "URI",
            ],
            False,
            virt_data.vm_id,
            virt_data.name,
            virt_data.cpu_times,
            virt_data.mem_actual,
            virt_data.mem_unused,
            virt_data.write_bytes,
            virt_data.read_bytes,
            virt_data.macs,
            virt_data.ips,
            virt_data.disk_reads,
            virt_data.disk_writes,
            virt_data.snapshot_counts,
            virt_data.uri,
        )
        for line in lines:
            print(line)
        time.sleep(argparser.args.delay)
        # clears the screen
        print(Colors.screen_clear, end="")
        print(Colors.hide_cursor, end="")


if __name__ == "__main__":
    main()