diff options
Diffstat (limited to '')
| -rw-r--r-- | README.md | 34 | ||||
| -rw-r--r-- | pyproject.toml | 2 | ||||
| -rw-r--r-- | virttop.png | bin | 0 -> 57124 bytes | |||
| -rwxr-xr-x | virttop/virttop.py | 464 | 
4 files changed, 350 insertions, 150 deletions
| @@ -1,2 +1,36 @@  # virttop  a top like utility for libvirt + + + + +## How to get +```sh +pip install virttop +``` + +## Configfile +The default location for the config file is '~/.virttop.toml'. + +```toml +[color] +name_column_fg=23 +name_column_bg=0 +active_row_fg=24 +active_row_bg=0 +inactive_row_fg=244 +inactive_row_bg=0 +box_fg=29 +box_bg=0 +selected_fg=0 +selected_bg=36 +``` + +## Keybindings + +`j` and `k` and arrow keys move up and down.</br> +`g` moves to the top of the list.</br> +`G` moved to the bottom of the list.</br> +`r` runs an inactive domain.</br> +`s` shuts down a running domain.</br> +`d` destroy a running domain.</br> diff --git a/pyproject.toml b/pyproject.toml index e2c870b..1f4a7b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@  [tool.poetry]  name = "virttop" -version = "0.1.1" +version = "0.2.0"  description = "A top like utility for libvirt"  authors = ["terminaldweller <devi@terminaldweller.com>"]  license = "GPL-3.0" diff --git a/virttop.png b/virttop.pngBinary files differ new file mode 100644 index 0000000..57637d2 --- /dev/null +++ b/virttop.png diff --git a/virttop/virttop.py b/virttop/virttop.py index 0a1e4f0..99f2b61 100755 --- a/virttop/virttop.py +++ b/virttop/virttop.py @@ -1,14 +1,15 @@  #!/usr/bin/env python -"""virt top""" +"""virttop"""  # ideally we would like to use the monkeypatch but it is untested  # and experimental  # import defusedxml  # type:ignore  # defusedxml.defuse_stdlib()  import argparse +import asyncio  import csv +import curses  import dataclasses -import enum  import os  import signal  import sys @@ -21,12 +22,32 @@ from xml.dom.minidom import Document  # nosec  from defusedxml import ElementTree  # type:ignore  from defusedxml import minidom  import libvirt  # type:ignore +import tomllib + + +def request_cred(credentials, sasl_user, sasl_pass): +    """Credential handler.""" +    for credential in credentials: +        if credential[0] == libvirt.VIR_CRED_AUTHNAME: +            credential[4] = sasl_user +        elif credential[0] == libvirt.VIR_CRED_PASSPHRASE: +            credential[4] = sasl_pass +    return 0 + + +def do_cleanup(): +    """Return the terminal to a sane state.""" +    curses.nocbreak() +    stdscr.keypad(False) +    curses.echo() +    curses.endwin() +    print()  # pylint: disable=unused-argument  def sig_handler_sigint(signum, frame): -    """Just to handle C-c gracefully""" -    print() +    """Just to handle C-c cleanly""" +    do_cleanup()      sys.exit(0) @@ -36,13 +57,6 @@ class Argparser:  # pylint: disable=too-few-public-methods      def __init__(self):          self.parser = argparse.ArgumentParser()          self.parser.add_argument( -            "--delay", -            "-d", -            type=float, -            help="The delay between updates", -            default=5, -        ) -        self.parser.add_argument(              "--uri",              "-u",              nargs="+", @@ -50,29 +64,29 @@ class Argparser:  # pylint: disable=too-few-public-methods              help="A list of URIs to connect to seperated by commas",              default=["qemu:///system"],          ) -        self.args = self.parser.parse_args() - +        self.parser.add_argument( +            "--config", +            "-c", +            type=str, +            help="Path to the config file", +            default="~/.virttop.toml", +        ) +        self.parser.add_argument( +            "--active", +            "-a", +            type=bool, +            help="Show active VMs only", +            default=False, +        ) +        self.parser.add_argument( +            "--debug", +            "-d", +            type=bool, +            help="Turn on debug logging", +            default=False, +        ) -# pylint: disable=too-few-public-methods -class Colors(enum.EnumType): -    """static color definitions""" - -    purple = "\033[95m" -    blue = "\033[94m" -    green = "\033[92m" -    yellow = "\033[93m" -    red = "\033[91m" -    grey = "\033[1;37m" -    darkgrey = "\033[1;30m" -    cyan = "\033[1;36m" -    ENDC = "\033[0m" -    BOLD = "\033[1m" -    UNDERLINE = "\033[4m" -    blueblue = "\x1b[38;5;24m" -    greenie = "\x1b[38;5;23m" -    goo = "\x1b[38;5;22m" -    screen_clear = "\033c\033[3J" -    hide_cursor = "\033[?25l" +        self.args = self.parser.parse_args()  @dataclasses.dataclass @@ -95,9 +109,16 @@ class VirtData:      uri: typing.List[str] = dataclasses.field(default_factory=list)      memory_pool: typing.List[str] = dataclasses.field(default_factory=list) -    pools: typing.List[libvirt.virStoragePool] = dataclasses.field( -        default_factory=list -    ) +    pools: typing.List[libvirt.virStoragePool] = dataclasses.field(default_factory=list) + + +@dataclasses.dataclass +class ConfigData: +    """Holds the config data""" + +    sasl_user: str = dataclasses.field(default_factory=str) +    sasl_password: str = dataclasses.field(default_factory=str) +    color: typing.Dict[str, int] = dataclasses.field(default_factory=dict)  def get_network_info( @@ -147,14 +168,15 @@ def get_disk_info(          for disk_node in disk_nodes:              if disk_node.nodeName[0:1] != "#":                  for attr in disk_node.attributes.keys(): -                    result_dict[ -                        disk_node.attributes[attr].name -                    ] = disk_node.attributes[attr].value +                    result_dict[disk_node.attributes[attr].name] = disk_node.attributes[ +                        attr +                    ].value      return result_dict  # pylint: disable=too-many-locals +# pylint: disable=too-many-branches  def ffs(      offset: int,      header_list: typing.Optional[typing.List[str]], @@ -167,28 +189,13 @@ def ffs(      numbers_f: typing.List[int] = []      dummy = [] -    if sys.stdout.isatty(): -        greenie = Colors.greenie -        bold = Colors.BOLD -        endc = Colors.ENDC -        goo = Colors.goo -        blueblue = Colors.blueblue -    else: -        greenie = "" -        bold = "" -        endc = "" -        goo = "" -        blueblue = "" -      for arg in args:          max_column_width.append(max(len(repr(argette)) for argette in arg))      if header_list is not None:          if numbered:              numbers_f.extend(range(1, len(args[-1]) + 1)) -            max_column_width.append( -                max(len(repr(number)) for number in numbers_f) -            ) +            max_column_width.append(max(len(repr(number)) for number in numbers_f))              header_list.insert(0, "idx")          index = range(0, len(header_list)) @@ -196,26 +203,28 @@ def ffs(              max_column_width[i] = max(len(header), width) + offset          for i in index: -            dummy.append( -                greenie -                + bold -                + header_list[i].ljust(max_column_width[i]) -                + endc -            ) +            dummy.append(header_list[i].ljust(max_column_width[i]))          lines.append("".join(dummy))          dummy.clear()      index2 = range(0, len(args[-1])) +    active_count: int = 1      for i in index2:          if numbered: -            dummy.append( -                goo + bold + repr(i).ljust(max_column_width[0]) + endc -            ) +            if int(args[0][i]) >= 0: +                active_count += 1 +            dummy.append(repr(i).ljust(max_column_width[0]))              for arg, width in zip(args, max_column_width[1:]): -                dummy.append(blueblue + (arg[i]).ljust(width) + endc) +                if int(args[0][i]) >= 0: +                    dummy.append((arg[i]).ljust(width)) +                else: +                    dummy.append((arg[i]).ljust(width))          else:              for arg, width in zip(args, max_column_width): -                dummy.append(blueblue + (arg[i]).ljust(width) + endc) +                if int(args[0][i]) >= 0: +                    dummy.append((arg[i]).ljust(width)) +                else: +                    dummy.append((arg[i]).ljust(width))          lines.append("".join(dummy))          dummy.clear()      return lines @@ -238,103 +247,193 @@ def size_abr(num: float, shift_by: float) -> str:  # pylint: disable=too-many-locals  def fill_virt_data_uri(      conn: libvirt.virConnect, -    active_hosts: typing.List[int], +    hosts: typing.List[libvirt.virDomain],      virt_data: VirtData,      arp_table: typing.Dict[str, str], +    active_only: bool,  ) -> None:      """fill VirtData for one URI.""" -    for host_id in active_hosts: -        virt_data.uri.append(conn.getURI()) -        dom = conn.lookupByID(host_id) -        virt_data.snapshot_counts.append(repr(dom.snapshotNum())) +    for host in hosts:          try: -            virt_data.cpu_times.append( -                repr( -                    int( -                        dom.getCPUStats(total=True)[0]["cpu_time"] -                        / 1_000_000_000 +            if active_only and host.ID() <= 0: +                continue +            virt_data.vm_id.append(repr(host.ID())) +            virt_data.uri.append(conn.getURI()) +            virt_data.name.append(host.name()) +            dom = conn.lookupByName(host.name()) + +            virt_data.snapshot_counts.append(repr(dom.snapshotNum())) +            if host.ID() > 0: +                try: +                    virt_data.cpu_times.append( +                        repr( +                            int( +                                dom.getCPUStats(total=True)[0]["cpu_time"] +                                / 1_000_000_000 +                            ) +                        ) +                        + "s"                      ) +                except: +                    virt_data.cpu_times.append("n/a") +            else: +                virt_data.cpu_times.append("-") + +            xml_doc = minidom.parseString(dom.XMLDesc()) + +            if host.ID() >= 0: +                mem_stats = dom.memoryStats() +                if "actual" in mem_stats: +                    virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000)) +                else: +                    virt_data.mem_actual.append("n/a") + +                try: +                    virt_data.mem_unused.append(size_abr(mem_stats["available"], 1000)) +                except KeyError: +                    virt_data.mem_unused.append("N/A") +            else: +                virt_data.mem_actual.append("-") +                virt_data.mem_unused.append("-") + +            tree = ElementTree.fromstring(dom.XMLDesc()) +            if host.ID() >= 0: +                iface = tree.find("devices/interface/target").get("dev") +                stats = dom.interfaceStats(iface) +                virt_data.write_bytes.append(size_abr(stats[4], 1)) +                virt_data.read_bytes.append(size_abr(stats[0], 1)) +            else: +                virt_data.write_bytes.append("-") +                virt_data.read_bytes.append("-") + +            found_the_pool: bool = False +            disk = tree.find("devices/disk/source").get("file") +            for pool in virt_data.pools: +                if os.path.basename(disk) in pool.listVolumes(): +                    virt_data.memory_pool.append(pool.name()) +                    found_the_pool = True +            # you could delete the pool but keep the volumes inside +            # which results in a functional VM but it wont have a +            # volume inside a pool that we can detect +            if not found_the_pool: +                virt_data.memory_pool.append("N/A") + +            disk_info = get_disk_info(xml_doc) +            image_name = disk_info["file"] +            if host.ID() >= 0: +                _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name) +                virt_data.disk_reads.append(size_abr(rd_bytes, 1)) +                virt_data.disk_writes.append(size_abr(wr_bytes, 1)) + +                network_info = get_network_info(xml_doc) +                virt_data.macs.append(network_info["address"]) +                # TODO-this is obviously not going to work for remote URIs +                virt_data.ips.append( +                    get_ip_from_arp_table(arp_table, network_info["address"])                  ) -                + "s" -            ) +            else: +                virt_data.disk_reads.append("-") +                virt_data.disk_writes.append("-") +                virt_data.macs.append("-") +                virt_data.ips.append("-")          except: -            virt_data.cpu_times.append("n/a") -        xml_doc = minidom.parseString(dom.XMLDesc()) -        virt_data.name.append(dom.name()) +            pass -        mem_stats = dom.memoryStats() -        if "actual" in mem_stats: -            virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000)) -        else: -            virt_data.mem_actual.append("n/a") -        # BSD guests dont support mem balloons? -        try: -            virt_data.mem_unused.append(size_abr(mem_stats["available"], 1000)) -        except KeyError: -            virt_data.mem_unused.append("N/A") - -        tree = ElementTree.fromstring(dom.XMLDesc()) -        iface = tree.find("devices/interface/target").get("dev") -        stats = dom.interfaceStats(iface) -        virt_data.write_bytes.append(size_abr(stats[4], 1)) -        virt_data.read_bytes.append(size_abr(stats[0], 1)) - -        found_the_pool: bool = False -        disk = tree.find("devices/disk/source").get("file") -        for pool in virt_data.pools: -            if os.path.basename(disk) in pool.listVolumes(): -                virt_data.memory_pool.append(pool.name()) -                found_the_pool = True -        # you could delete the pool but keep the volumes inside -        # which results in a functional VM but it wont have a -        # volume inside a pool that we can detect -        if not found_the_pool: -            virt_data.memory_pool.append("N/A") - -        disk_info = get_disk_info(xml_doc) -        image_name = disk_info["file"] -        _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name) -        virt_data.disk_reads.append(size_abr(rd_bytes, 1)) -        virt_data.disk_writes.append(size_abr(wr_bytes, 1)) - -        network_info = get_network_info(xml_doc) -        virt_data.macs.append(network_info["address"]) -        # virt_data.ips.append(get_ip_by_arp(network_info["address"])) -        # TODO-this is obviously not going to work for remote URIs -        virt_data.ips.append( -            get_ip_from_arp_table(arp_table, network_info["address"]) -        ) +def read_config(config_path) -> ConfigData: +    """read the config""" +    config_data = ConfigData() +    try: +        with open(os.path.expanduser(config_path), "rb") as conf_file: +            data = tomllib.load(conf_file) +            for key, value in data.items(): +                match key: +                    case "sasl_user": +                        config_data.sasl_user = value +                    case "sasl_password": +                        config_data.sasl_password = value +                    case "color": +                        for key, value in value.items(): +                            config_data.color[key] = value +                    case _: +                        print(f"warning: unknown key, {key}, found.") +    except FileNotFoundError: +        pass + +    return config_data + + +def curses_init() -> None: +    """Initialize ncurses.""" +    curses.start_color() +    curses.use_default_colors() +    curses.curs_set(False) +    curses.noecho() +    curses.cbreak() +    stdscr.keypad(True) +    curses.halfdelay(4) + + +def init_color_pairs(config_data: ConfigData) -> None: +    """Initialize the curses color pairs.""" +    curses.init_pair( +        1, config_data.color["name_column_fg"], config_data.color["name_column_bg"] +    ) +    curses.init_pair( +        2, config_data.color["active_row_fg"], config_data.color["active_row_bg"] +    ) +    curses.init_pair( +        3, config_data.color["inactive_row_fg"], config_data.color["inactive_row_bg"] +    ) +    curses.init_pair(4, config_data.color["box_fg"], config_data.color["box_bg"]) +    curses.init_pair( +        5, config_data.color["selected_fg"], config_data.color["selected_bg"] +    ) + +def get_visible_rows(max_rows: int, sel: int) -> typing.Tuple[int, int]: +    """Returns the range of columns that will be visible based on max_rows.""" +    win_min_row = sel + 2 - int(max_rows / 2) +    win_min_row = max(win_min_row, 0) +    win_max_row = sel + 2 + int(max_rows / 2) +    win_max_row = max(win_max_row, max_rows) +    return win_min_row, win_max_row -def main() -> None: + +async def main() -> None:      """entrypoint""" +    sel: int = 0 +    current_row: int = 0 +    current_visi: int = 0 +    vm_name_ordered_list: typing.List[str] = [] + +    curses_init()      signal.signal(signal.SIGINT, sig_handler_sigint)      argparser = Argparser() -    print(Colors.screen_clear, end="") +    config_data = read_config(argparser.args.config) +    arp_table = get_arp_table() +    init_color_pairs(config_data)      while True: +        stdscr.clear()          virt_data = VirtData() -        arp_table = get_arp_table()          for hv_host in argparser.args.uri: -            conn = libvirt.openReadOnly(hv_host) -            active_hosts = conn.listDomainsID() -            if len(active_hosts) > 0: +            auth = [ +                [libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE], +                request_cred, +                None, +            ] +            # conn = libvirt.openAuth(hv_host, auth, libvirt.VIR_CONNECT_RO) +            conn = libvirt.openAuth(hv_host, auth, 0) +            hosts = conn.listAllDomains() +            if len(hosts) > 0:                  virt_data.pools = conn.listAllStoragePools() -                # for pool in virt_data.pools: -                #     print(pool.listVolumes()) -                # networks = conn.listAllNetworks() -                # print([pool.name() for pool in conn.listAllStoragePools()]) -                # print([net.name() for net in conn.listAllNetworks()]) -                virt_data.vm_id = [ -                    repr(vm_id) for vm_id in conn.listDomainsID() -                ] -                fill_virt_data_uri(conn, active_hosts, virt_data, arp_table) -                # for conn_id in conn.listAllDomains(): -                #     print(conn_id.name()) -                #     print(conn_id.state()) +                fill_virt_data_uri( +                    conn, hosts, virt_data, arp_table, argparser.args.active +                )              else:                  print("no active VMs found.") -                sys.exit(1) +                time.sleep(3) +                continue          lines = ffs(              2, @@ -370,13 +469,80 @@ def main() -> None:              virt_data.uri,              virt_data.memory_pool,          ) -        for line in lines: -            print(line) -        time.sleep(argparser.args.delay) -        # clears the screen -        print(Colors.screen_clear, end="") -        print(Colors.hide_cursor, end="") +        stdscr.attron(curses.color_pair(4)) +        stdscr.box() +        stdscr.attroff(curses.color_pair(4)) +        stdscr.addstr(1, 1, lines[0], curses.color_pair(1)) + +        max_rows, _ = stdscr.getmaxyx() + +        win_min_row, win_max_row = get_visible_rows(max_rows - 3, sel) + +        current_row = 0 +        current_visi = 0 +        active_ids: typing.List[int] = [] +        for count, (line, vm_id, vm_name) in enumerate( +            zip(lines[1:], virt_data.vm_id, virt_data.name) +        ): +            if int(vm_id) >= 0: +                vm_name_ordered_list.append(vm_name) +                active_ids.append(count) +                current_row += 1 +                if current_row > win_max_row or current_row <= win_min_row: +                    continue +                if sel == current_row - 1: +                    stdscr.addstr(current_visi + 2, 1, line, curses.color_pair(5)) +                else: +                    stdscr.addstr(current_visi + 2, 1, line, curses.color_pair(2)) +                current_visi += 1 + +        inactive_count: int = len(active_ids) +        if not argparser.args.active: +            for count, (line, vm_name) in enumerate(zip(lines[1:], virt_data.name)): +                if count not in active_ids: +                    vm_name_ordered_list.append(vm_name) +                    inactive_count += 1 +                    current_row += 1 +                    if current_row >= win_max_row or current_row < win_min_row: +                        continue +                    if sel == current_row - 1: +                        stdscr.addstr(current_visi + 2, 1, line, curses.color_pair(5)) +                    else: +                        stdscr.addstr(current_visi + 2, 1, line, curses.color_pair(3)) +                    current_visi += 1 + +        char = stdscr.getch() +        if char == ord("j") or char == curses.KEY_DOWN: +            sel = (sel + 1) % (len(lines) - 1) +        elif char == ord("k") or char == curses.KEY_UP: +            sel = (sel - 1) % (len(lines) - 1) +        elif char == ord("g"): +            sel = 0 +        elif char == ord("G"): +            sel = len(lines) - 2 +        elif char == ord("q"): +            break +        elif char == ord("d"): +            dom = conn.lookupByName(vm_name_ordered_list[sel]) +            dom.destroyFlags(flags=libvirt.VIR_DOMAIN_DESTROY_GRACEFUL) +        elif char == ord("s"): +            dom = conn.lookupByName(vm_name_ordered_list[sel]) +            dom.shutdownFlags() +        elif char == ord("r"): +            dom = conn.lookupByName(vm_name_ordered_list[sel]) +            dom.createWithFlags() +        else: +            pass + +        stdscr.refresh() +# FIXME- the finally wipes the screen, effectively rendering the help option useless  if __name__ == "__main__": -    main() +    stdscr = curses.initscr() +    try: +        asyncio.run(main()) +    except Exception as e: +        print(e) +    finally: +        do_cleanup() | 
