aboutsummaryrefslogblamecommitdiffstats
path: root/bin/virttop
blob: 34c93ba494509dd36ca68499e9dc30237e7b1926 (plain) (tree)
1
2
3
4
5
6
7
8
9







                                                                 
          
                  
           
          
 
             
                                    



                                                 









                                                          
                                             

                      







                                                                    



                                            
                            

















                                  




                                                               
                                                                     









                                                                           

                                                                               




                           

                                  
                                                               










                                                             












                                                                             
        


                             

 


                           

                                 
                                                     











                                                        
                                                                           


                                   
                                    



                                                     
                                                                               


                                    
                                                                           

























                                                                      
                                                                         



                                                          
                                                                         






                                                 
                                        


                        





                                                          


                






                                     

                                    
                                           
                                      
                                                                 










                                                                               





                                                                            














                                                                




                                                                      

 

                    

                           
                               
                                      

                                            





                                                                             
                                                                        





                                                   

                
          
         






                          

                  



                         
          

                        









                              

                                  


                      



                          
#!/usr/bin/env python
"""virt top"""

# ideally we would like to use the monkeypatch but it is untested
# and experimental
# import defusedxml  # type:ignore
# defusedxml.defuse_stdlib()
import argparse
import csv
import dataclasses
import enum
import sys

import typing
from xml.dom.minidom import Document
from defusedxml import ElementTree  # type:ignore
from defusedxml import minidom
import libvirt  # type:ignore


class Argparser:  # pylint: disable=too-few-public-methods
    """Argparser class."""

    def __init__(self):
        self.parser = argparse.ArgumentParser()
        self.parser.add_argument(
            "--delay",
            "-d",
            type=float,
            help="The delay between updates",
            default=5,
        )
        self.parser.add_argument(
            "--uri",
            "-u",
            nargs="+",
            type=str,
            help="A list of URIs to connect to seperated by commas",
            default=["qemu:///system"],
        )
        self.args = self.parser.parse_args()


# pylint: disable=too-few-public-methods
class Colors(enum.EnumType):
    """static color definitions"""

    purple = "\033[95m"
    blue = "\033[94m"
    green = "\033[92m"
    yellow = "\033[93m"
    red = "\033[91m"
    grey = "\033[1;37m"
    darkgrey = "\033[1;30m"
    cyan = "\033[1;36m"
    ENDC = "\033[0m"
    BOLD = "\033[1m"
    UNDERLINE = "\033[4m"
    blueblue = "\x1b[38;5;24m"
    greenie = "\x1b[38;5;23m"
    goo = "\x1b[38;5;22m"


@dataclasses.dataclass
# pylint: disable=too-many-instance-attributes
class VirtData:
    """Holds the data that we collect to display to the user"""

    vm_id: typing.List[str] = dataclasses.field(default_factory=list)
    name: typing.List[str] = dataclasses.field(default_factory=list)
    cpu_times: typing.List[str] = dataclasses.field(default_factory=list)
    mem_actual: typing.List[str] = dataclasses.field(default_factory=list)
    mem_unused: typing.List[str] = dataclasses.field(default_factory=list)
    write_bytes: typing.List[str] = dataclasses.field(default_factory=list)
    read_bytes: typing.List[str] = dataclasses.field(default_factory=list)
    macs: typing.List[str] = dataclasses.field(default_factory=list)
    ips: typing.List[str] = dataclasses.field(default_factory=list)
    disk_reads: typing.List[str] = dataclasses.field(default_factory=list)
    disk_writes: typing.List[str] = dataclasses.field(default_factory=list)
    snapshot_counts: typing.List[str] = dataclasses.field(default_factory=list)
    uri: typing.List[str] = dataclasses.field(default_factory=list)


def get_network_info(
    xml_doc: Document,
) -> typing.Dict[str, str]:
    """returns the network info"""
    result_dict = {}
    interface_types = xml_doc.getElementsByTagName("interface")
    for interface_type in interface_types:
        interface_nodes = interface_type.childNodes
        for interface_node in interface_nodes:
            if interface_node.nodeName[0:1] != "#":
                for attr in interface_node.attributes.keys():
                    result_dict[
                        interface_node.attributes[attr].name
                    ] = interface_node.attributes[attr].value
    return result_dict


def get_arp_table() -> typing.Dict[str, str]:
    """Get the ARP table. return a dict with MAC/IP as key/value pair."""
    result: typing.Dict[str, str] = {}
    with open("/proc/net/arp", encoding="utf-8") as arp_table:
        reader = csv.reader(arp_table, skipinitialspace=True, delimiter=" ")
        for arp_entry in reader:
            result[arp_entry[3]] = arp_entry[0]

    return result


def get_ip_from_arp_table(arp_table: typing.Dict[str, str], mac: str) -> str:
    """get IP from MAC address using the arp table"""
    try:
        return arp_table[mac]
    except KeyError:
        return "N/A"


def get_disk_info(
    xml_doc: Document,
) -> typing.Dict[str, str]:
    """returns the disk info"""
    result_dict: typing.Dict = {}
    disk_types = xml_doc.getElementsByTagName("disk")
    for disk_type in disk_types:
        disk_nodes = disk_type.childNodes
        for disk_node in disk_nodes:
            if disk_node.nodeName[0:1] != "#":
                for attr in disk_node.attributes.keys():
                    result_dict[
                        disk_node.attributes[attr].name
                    ] = disk_node.attributes[attr].value

    return result_dict


def ffs(offset: int, header_list: typing.List[str], numbered: bool, *args):
    """A simple columnar printer"""
    max_column_width = []
    lines = []
    numbers_f: typing.List[int] = []
    dummy = []

    if numbered:
        numbers_f.extend(range(1, len(args[-1]) + 1))
        max_column_width.append(max(len(repr(number)) for number in numbers_f))
        header_list.insert(0, "idx")

    for arg in args:
        max_column_width.append(max(len(repr(argette)) for argette in arg))

    index = range(0, len(header_list))
    for header, width, i in zip(header_list, max_column_width, index):
        max_column_width[i] = max(len(header), width) + offset

    for i in index:
        dummy.append(
            Colors.greenie
            + Colors.BOLD
            + header_list[i].ljust(max_column_width[i])
            + Colors.ENDC
        )
    lines.append("".join(dummy))
    dummy.clear()

    index2 = range(0, len(args[-1]))
    for i in index2:
        if numbered:
            dummy.append(
                Colors.goo
                + Colors.BOLD
                + repr(i).ljust(max_column_width[0])
                + Colors.ENDC
            )
            for arg, width in zip(args, max_column_width[1:]):
                dummy.append(
                    Colors.blueblue + (arg[i]).ljust(width) + Colors.ENDC
                )
        else:
            for arg, width in zip(args, max_column_width):
                dummy.append(
                    Colors.blueblue + (arg[i]).ljust(width) + Colors.ENDC
                )
        lines.append("".join(dummy))
        dummy.clear()
    return lines


def size_abr(num: float, shift_by: float) -> str:
    """Rounds and abbreviates floats."""
    num = num * shift_by
    if num < 1000:
        return repr(num)
    if num < 1_000_000.0:
        return repr(round(num / 1_000, 2)) + " KB"
    if num < 1_000_000_000:
        return repr(round(num / 1_000_000, 2)) + " MB"
    if num < 1_000_000_000_000:
        return repr(round(num / 1_000_000_000, 2)) + " GB"
    return "N/A"


# pylint: disable=too-many-locals
def fill_virt_data_uri(
    conn: libvirt.virConnect,
    active_hosts: typing.List[int],
    virt_data: VirtData,
    arp_table: typing.Dict[str, str],
) -> None:
    """fill VirtData for one URI."""
    for host_id in active_hosts:
        virt_data.uri.append(conn.getURI())
        dom = conn.lookupByID(host_id)
        virt_data.snapshot_counts.append(repr(dom.snapshotNum()))
        virt_data.cpu_times.append(
            repr(
                int(dom.getCPUStats(total=True)[0]["cpu_time"] / 1_000_000_000)
            )
            + "s"
        )
        xml_doc = minidom.parseString(dom.XMLDesc())
        virt_data.name.append(dom.name())

        mem_stats = dom.memoryStats()
        virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000))

        # BSD guests dont have unused memory
        try:
            virt_data.mem_unused.append(size_abr(mem_stats["unused"], 1000))
        except KeyError:
            virt_data.mem_unused.append("N/A")

        tree = ElementTree.fromstring(dom.XMLDesc())
        iface = tree.find("devices/interface/target").get("dev")
        stats = dom.interfaceStats(iface)
        virt_data.write_bytes.append(size_abr(stats[4], 1))
        virt_data.read_bytes.append(size_abr(stats[0], 1))

        disk_info = get_disk_info(xml_doc)
        image_name = disk_info["file"]
        _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name)
        virt_data.disk_reads.append(size_abr(rd_bytes, 1))
        virt_data.disk_writes.append(size_abr(wr_bytes, 1))

        network_info = get_network_info(xml_doc)
        virt_data.macs.append(network_info["address"])
        # virt_data.ips.append(get_ip_by_arp(network_info["address"]))
        # TODO-this is obviously not going to work for remote URIs
        virt_data.ips.append(
            get_ip_from_arp_table(arp_table, network_info["address"])
        )


def main() -> None:
    """entrypoint"""
    argparser = Argparser()
    virt_data = VirtData()
    arp_table = get_arp_table()
    for hv_host in argparser.args.uri:
        conn = libvirt.openReadOnly(hv_host)
        active_hosts = conn.listDomainsID()
        if len(active_hosts) > 0:
            # pools = conn.listAllStoragePools()
            # networks = conn.listAllNetworks()
            # print([pool.name() for pool in conn.listAllStoragePools()])
            # print([net.name() for net in conn.listAllNetworks()])
            virt_data.vm_id = [repr(vm_id) for vm_id in conn.listDomainsID()]
            fill_virt_data_uri(conn, active_hosts, virt_data, arp_table)
            # for conn_id in conn.listAllDomains():
            #     print(conn_id.name())
            #     print(conn_id.state())
        else:
            print("no active VMs found.")
            sys.exit(1)

    lines = ffs(
        2,
        [
            "ID",
            "NAME",
            "CPU",
            "MEM_ACTUAL",
            "MEM_UNUSED",
            "NET_WRITE_B",
            "NET_READ_B",
            "MAC",
            "IP",
            "IO_READ_B",
            "IO_WRITE_B",
            "SNAPSHOTS",
            "URI",
        ],
        False,
        virt_data.vm_id,
        virt_data.name,
        virt_data.cpu_times,
        virt_data.mem_actual,
        virt_data.mem_unused,
        virt_data.write_bytes,
        virt_data.read_bytes,
        virt_data.macs,
        virt_data.ips,
        virt_data.disk_reads,
        virt_data.disk_writes,
        virt_data.snapshot_counts,
        virt_data.uri,
    )
    for line in lines:
        print(line)


if __name__ == "__main__":
    main()