From f9c5c641cd43f3badd704cf1ba929bb3a91c14e7 Mon Sep 17 00:00:00 2001 From: terminaldweller Date: Fri, 24 Mar 2023 01:30:06 +0330 Subject: virttop is now an interactive utility --- README.md | 34 ++++ pyproject.toml | 2 +- virttop.png | Bin 0 -> 57124 bytes virttop/virttop.py | 464 ++++++++++++++++++++++++++++++++++++----------------- 4 files changed, 350 insertions(+), 150 deletions(-) create mode 100644 virttop.png diff --git a/README.md b/README.md index edd5efe..15649cd 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,36 @@ # virttop a top like utility for libvirt + + +![Image](virttop.png) + +## How to get +```sh +pip install virttop +``` + +## Configfile +The default location for the config file is '~/.virttop.toml'. + +```toml +[color] +name_column_fg=23 +name_column_bg=0 +active_row_fg=24 +active_row_bg=0 +inactive_row_fg=244 +inactive_row_bg=0 +box_fg=29 +box_bg=0 +selected_fg=0 +selected_bg=36 +``` + +## Keybindings + +`j` and `k` and arrow keys move up and down.
+`g` moves to the top of the list.
+`G` moved to the bottom of the list.
+`r` runs an inactive domain.
+`s` shuts down a running domain.
+`d` destroy a running domain.
diff --git a/pyproject.toml b/pyproject.toml index e2c870b..1f4a7b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "virttop" -version = "0.1.1" +version = "0.2.0" description = "A top like utility for libvirt" authors = ["terminaldweller "] license = "GPL-3.0" diff --git a/virttop.png b/virttop.png new file mode 100644 index 0000000..57637d2 Binary files /dev/null and b/virttop.png differ diff --git a/virttop/virttop.py b/virttop/virttop.py index 0a1e4f0..99f2b61 100755 --- a/virttop/virttop.py +++ b/virttop/virttop.py @@ -1,14 +1,15 @@ #!/usr/bin/env python -"""virt top""" +"""virttop""" # ideally we would like to use the monkeypatch but it is untested # and experimental # import defusedxml # type:ignore # defusedxml.defuse_stdlib() import argparse +import asyncio import csv +import curses import dataclasses -import enum import os import signal import sys @@ -21,12 +22,32 @@ from xml.dom.minidom import Document # nosec from defusedxml import ElementTree # type:ignore from defusedxml import minidom import libvirt # type:ignore +import tomllib + + +def request_cred(credentials, sasl_user, sasl_pass): + """Credential handler.""" + for credential in credentials: + if credential[0] == libvirt.VIR_CRED_AUTHNAME: + credential[4] = sasl_user + elif credential[0] == libvirt.VIR_CRED_PASSPHRASE: + credential[4] = sasl_pass + return 0 + + +def do_cleanup(): + """Return the terminal to a sane state.""" + curses.nocbreak() + stdscr.keypad(False) + curses.echo() + curses.endwin() + print() # pylint: disable=unused-argument def sig_handler_sigint(signum, frame): - """Just to handle C-c gracefully""" - print() + """Just to handle C-c cleanly""" + do_cleanup() sys.exit(0) @@ -35,13 +56,6 @@ class Argparser: # pylint: disable=too-few-public-methods def __init__(self): self.parser = argparse.ArgumentParser() - self.parser.add_argument( - "--delay", - "-d", - type=float, - help="The delay between updates", - default=5, - ) self.parser.add_argument( "--uri", "-u", @@ -50,29 +64,29 @@ class Argparser: # pylint: disable=too-few-public-methods help="A list of URIs to connect to seperated by commas", default=["qemu:///system"], ) - self.args = self.parser.parse_args() - + self.parser.add_argument( + "--config", + "-c", + type=str, + help="Path to the config file", + default="~/.virttop.toml", + ) + self.parser.add_argument( + "--active", + "-a", + type=bool, + help="Show active VMs only", + default=False, + ) + self.parser.add_argument( + "--debug", + "-d", + type=bool, + help="Turn on debug logging", + default=False, + ) -# pylint: disable=too-few-public-methods -class Colors(enum.EnumType): - """static color definitions""" - - purple = "\033[95m" - blue = "\033[94m" - green = "\033[92m" - yellow = "\033[93m" - red = "\033[91m" - grey = "\033[1;37m" - darkgrey = "\033[1;30m" - cyan = "\033[1;36m" - ENDC = "\033[0m" - BOLD = "\033[1m" - UNDERLINE = "\033[4m" - blueblue = "\x1b[38;5;24m" - greenie = "\x1b[38;5;23m" - goo = "\x1b[38;5;22m" - screen_clear = "\033c\033[3J" - hide_cursor = "\033[?25l" + self.args = self.parser.parse_args() @dataclasses.dataclass @@ -95,9 +109,16 @@ class VirtData: uri: typing.List[str] = dataclasses.field(default_factory=list) memory_pool: typing.List[str] = dataclasses.field(default_factory=list) - pools: typing.List[libvirt.virStoragePool] = dataclasses.field( - default_factory=list - ) + pools: typing.List[libvirt.virStoragePool] = dataclasses.field(default_factory=list) + + +@dataclasses.dataclass +class ConfigData: + """Holds the config data""" + + sasl_user: str = dataclasses.field(default_factory=str) + sasl_password: str = dataclasses.field(default_factory=str) + color: typing.Dict[str, int] = dataclasses.field(default_factory=dict) def get_network_info( @@ -147,14 +168,15 @@ def get_disk_info( for disk_node in disk_nodes: if disk_node.nodeName[0:1] != "#": for attr in disk_node.attributes.keys(): - result_dict[ - disk_node.attributes[attr].name - ] = disk_node.attributes[attr].value + result_dict[disk_node.attributes[attr].name] = disk_node.attributes[ + attr + ].value return result_dict # pylint: disable=too-many-locals +# pylint: disable=too-many-branches def ffs( offset: int, header_list: typing.Optional[typing.List[str]], @@ -167,28 +189,13 @@ def ffs( numbers_f: typing.List[int] = [] dummy = [] - if sys.stdout.isatty(): - greenie = Colors.greenie - bold = Colors.BOLD - endc = Colors.ENDC - goo = Colors.goo - blueblue = Colors.blueblue - else: - greenie = "" - bold = "" - endc = "" - goo = "" - blueblue = "" - for arg in args: max_column_width.append(max(len(repr(argette)) for argette in arg)) if header_list is not None: if numbered: numbers_f.extend(range(1, len(args[-1]) + 1)) - max_column_width.append( - max(len(repr(number)) for number in numbers_f) - ) + max_column_width.append(max(len(repr(number)) for number in numbers_f)) header_list.insert(0, "idx") index = range(0, len(header_list)) @@ -196,26 +203,28 @@ def ffs( max_column_width[i] = max(len(header), width) + offset for i in index: - dummy.append( - greenie - + bold - + header_list[i].ljust(max_column_width[i]) - + endc - ) + dummy.append(header_list[i].ljust(max_column_width[i])) lines.append("".join(dummy)) dummy.clear() index2 = range(0, len(args[-1])) + active_count: int = 1 for i in index2: if numbered: - dummy.append( - goo + bold + repr(i).ljust(max_column_width[0]) + endc - ) + if int(args[0][i]) >= 0: + active_count += 1 + dummy.append(repr(i).ljust(max_column_width[0])) for arg, width in zip(args, max_column_width[1:]): - dummy.append(blueblue + (arg[i]).ljust(width) + endc) + if int(args[0][i]) >= 0: + dummy.append((arg[i]).ljust(width)) + else: + dummy.append((arg[i]).ljust(width)) else: for arg, width in zip(args, max_column_width): - dummy.append(blueblue + (arg[i]).ljust(width) + endc) + if int(args[0][i]) >= 0: + dummy.append((arg[i]).ljust(width)) + else: + dummy.append((arg[i]).ljust(width)) lines.append("".join(dummy)) dummy.clear() return lines @@ -238,103 +247,193 @@ def size_abr(num: float, shift_by: float) -> str: # pylint: disable=too-many-locals def fill_virt_data_uri( conn: libvirt.virConnect, - active_hosts: typing.List[int], + hosts: typing.List[libvirt.virDomain], virt_data: VirtData, arp_table: typing.Dict[str, str], + active_only: bool, ) -> None: """fill VirtData for one URI.""" - for host_id in active_hosts: - virt_data.uri.append(conn.getURI()) - dom = conn.lookupByID(host_id) - virt_data.snapshot_counts.append(repr(dom.snapshotNum())) + for host in hosts: try: - virt_data.cpu_times.append( - repr( - int( - dom.getCPUStats(total=True)[0]["cpu_time"] - / 1_000_000_000 + if active_only and host.ID() <= 0: + continue + virt_data.vm_id.append(repr(host.ID())) + virt_data.uri.append(conn.getURI()) + virt_data.name.append(host.name()) + dom = conn.lookupByName(host.name()) + + virt_data.snapshot_counts.append(repr(dom.snapshotNum())) + if host.ID() > 0: + try: + virt_data.cpu_times.append( + repr( + int( + dom.getCPUStats(total=True)[0]["cpu_time"] + / 1_000_000_000 + ) + ) + + "s" ) + except: + virt_data.cpu_times.append("n/a") + else: + virt_data.cpu_times.append("-") + + xml_doc = minidom.parseString(dom.XMLDesc()) + + if host.ID() >= 0: + mem_stats = dom.memoryStats() + if "actual" in mem_stats: + virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000)) + else: + virt_data.mem_actual.append("n/a") + + try: + virt_data.mem_unused.append(size_abr(mem_stats["available"], 1000)) + except KeyError: + virt_data.mem_unused.append("N/A") + else: + virt_data.mem_actual.append("-") + virt_data.mem_unused.append("-") + + tree = ElementTree.fromstring(dom.XMLDesc()) + if host.ID() >= 0: + iface = tree.find("devices/interface/target").get("dev") + stats = dom.interfaceStats(iface) + virt_data.write_bytes.append(size_abr(stats[4], 1)) + virt_data.read_bytes.append(size_abr(stats[0], 1)) + else: + virt_data.write_bytes.append("-") + virt_data.read_bytes.append("-") + + found_the_pool: bool = False + disk = tree.find("devices/disk/source").get("file") + for pool in virt_data.pools: + if os.path.basename(disk) in pool.listVolumes(): + virt_data.memory_pool.append(pool.name()) + found_the_pool = True + # you could delete the pool but keep the volumes inside + # which results in a functional VM but it wont have a + # volume inside a pool that we can detect + if not found_the_pool: + virt_data.memory_pool.append("N/A") + + disk_info = get_disk_info(xml_doc) + image_name = disk_info["file"] + if host.ID() >= 0: + _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name) + virt_data.disk_reads.append(size_abr(rd_bytes, 1)) + virt_data.disk_writes.append(size_abr(wr_bytes, 1)) + + network_info = get_network_info(xml_doc) + virt_data.macs.append(network_info["address"]) + # TODO-this is obviously not going to work for remote URIs + virt_data.ips.append( + get_ip_from_arp_table(arp_table, network_info["address"]) ) - + "s" - ) + else: + virt_data.disk_reads.append("-") + virt_data.disk_writes.append("-") + virt_data.macs.append("-") + virt_data.ips.append("-") except: - virt_data.cpu_times.append("n/a") - xml_doc = minidom.parseString(dom.XMLDesc()) - virt_data.name.append(dom.name()) + pass - mem_stats = dom.memoryStats() - if "actual" in mem_stats: - virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000)) - else: - virt_data.mem_actual.append("n/a") - # BSD guests dont support mem balloons? - try: - virt_data.mem_unused.append(size_abr(mem_stats["available"], 1000)) - except KeyError: - virt_data.mem_unused.append("N/A") - - tree = ElementTree.fromstring(dom.XMLDesc()) - iface = tree.find("devices/interface/target").get("dev") - stats = dom.interfaceStats(iface) - virt_data.write_bytes.append(size_abr(stats[4], 1)) - virt_data.read_bytes.append(size_abr(stats[0], 1)) - - found_the_pool: bool = False - disk = tree.find("devices/disk/source").get("file") - for pool in virt_data.pools: - if os.path.basename(disk) in pool.listVolumes(): - virt_data.memory_pool.append(pool.name()) - found_the_pool = True - # you could delete the pool but keep the volumes inside - # which results in a functional VM but it wont have a - # volume inside a pool that we can detect - if not found_the_pool: - virt_data.memory_pool.append("N/A") - - disk_info = get_disk_info(xml_doc) - image_name = disk_info["file"] - _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name) - virt_data.disk_reads.append(size_abr(rd_bytes, 1)) - virt_data.disk_writes.append(size_abr(wr_bytes, 1)) - - network_info = get_network_info(xml_doc) - virt_data.macs.append(network_info["address"]) - # virt_data.ips.append(get_ip_by_arp(network_info["address"])) - # TODO-this is obviously not going to work for remote URIs - virt_data.ips.append( - get_ip_from_arp_table(arp_table, network_info["address"]) - ) +def read_config(config_path) -> ConfigData: + """read the config""" + config_data = ConfigData() + try: + with open(os.path.expanduser(config_path), "rb") as conf_file: + data = tomllib.load(conf_file) + for key, value in data.items(): + match key: + case "sasl_user": + config_data.sasl_user = value + case "sasl_password": + config_data.sasl_password = value + case "color": + for key, value in value.items(): + config_data.color[key] = value + case _: + print(f"warning: unknown key, {key}, found.") + except FileNotFoundError: + pass + + return config_data + + +def curses_init() -> None: + """Initialize ncurses.""" + curses.start_color() + curses.use_default_colors() + curses.curs_set(False) + curses.noecho() + curses.cbreak() + stdscr.keypad(True) + curses.halfdelay(4) + + +def init_color_pairs(config_data: ConfigData) -> None: + """Initialize the curses color pairs.""" + curses.init_pair( + 1, config_data.color["name_column_fg"], config_data.color["name_column_bg"] + ) + curses.init_pair( + 2, config_data.color["active_row_fg"], config_data.color["active_row_bg"] + ) + curses.init_pair( + 3, config_data.color["inactive_row_fg"], config_data.color["inactive_row_bg"] + ) + curses.init_pair(4, config_data.color["box_fg"], config_data.color["box_bg"]) + curses.init_pair( + 5, config_data.color["selected_fg"], config_data.color["selected_bg"] + ) + +def get_visible_rows(max_rows: int, sel: int) -> typing.Tuple[int, int]: + """Returns the range of columns that will be visible based on max_rows.""" + win_min_row = sel + 2 - int(max_rows / 2) + win_min_row = max(win_min_row, 0) + win_max_row = sel + 2 + int(max_rows / 2) + win_max_row = max(win_max_row, max_rows) + return win_min_row, win_max_row -def main() -> None: + +async def main() -> None: """entrypoint""" + sel: int = 0 + current_row: int = 0 + current_visi: int = 0 + vm_name_ordered_list: typing.List[str] = [] + + curses_init() signal.signal(signal.SIGINT, sig_handler_sigint) argparser = Argparser() - print(Colors.screen_clear, end="") + config_data = read_config(argparser.args.config) + arp_table = get_arp_table() + init_color_pairs(config_data) while True: + stdscr.clear() virt_data = VirtData() - arp_table = get_arp_table() for hv_host in argparser.args.uri: - conn = libvirt.openReadOnly(hv_host) - active_hosts = conn.listDomainsID() - if len(active_hosts) > 0: + auth = [ + [libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE], + request_cred, + None, + ] + # conn = libvirt.openAuth(hv_host, auth, libvirt.VIR_CONNECT_RO) + conn = libvirt.openAuth(hv_host, auth, 0) + hosts = conn.listAllDomains() + if len(hosts) > 0: virt_data.pools = conn.listAllStoragePools() - # for pool in virt_data.pools: - # print(pool.listVolumes()) - # networks = conn.listAllNetworks() - # print([pool.name() for pool in conn.listAllStoragePools()]) - # print([net.name() for net in conn.listAllNetworks()]) - virt_data.vm_id = [ - repr(vm_id) for vm_id in conn.listDomainsID() - ] - fill_virt_data_uri(conn, active_hosts, virt_data, arp_table) - # for conn_id in conn.listAllDomains(): - # print(conn_id.name()) - # print(conn_id.state()) + fill_virt_data_uri( + conn, hosts, virt_data, arp_table, argparser.args.active + ) else: print("no active VMs found.") - sys.exit(1) + time.sleep(3) + continue lines = ffs( 2, @@ -370,13 +469,80 @@ def main() -> None: virt_data.uri, virt_data.memory_pool, ) - for line in lines: - print(line) - time.sleep(argparser.args.delay) - # clears the screen - print(Colors.screen_clear, end="") - print(Colors.hide_cursor, end="") + stdscr.attron(curses.color_pair(4)) + stdscr.box() + stdscr.attroff(curses.color_pair(4)) + stdscr.addstr(1, 1, lines[0], curses.color_pair(1)) + + max_rows, _ = stdscr.getmaxyx() + + win_min_row, win_max_row = get_visible_rows(max_rows - 3, sel) + + current_row = 0 + current_visi = 0 + active_ids: typing.List[int] = [] + for count, (line, vm_id, vm_name) in enumerate( + zip(lines[1:], virt_data.vm_id, virt_data.name) + ): + if int(vm_id) >= 0: + vm_name_ordered_list.append(vm_name) + active_ids.append(count) + current_row += 1 + if current_row > win_max_row or current_row <= win_min_row: + continue + if sel == current_row - 1: + stdscr.addstr(current_visi + 2, 1, line, curses.color_pair(5)) + else: + stdscr.addstr(current_visi + 2, 1, line, curses.color_pair(2)) + current_visi += 1 + + inactive_count: int = len(active_ids) + if not argparser.args.active: + for count, (line, vm_name) in enumerate(zip(lines[1:], virt_data.name)): + if count not in active_ids: + vm_name_ordered_list.append(vm_name) + inactive_count += 1 + current_row += 1 + if current_row >= win_max_row or current_row < win_min_row: + continue + if sel == current_row - 1: + stdscr.addstr(current_visi + 2, 1, line, curses.color_pair(5)) + else: + stdscr.addstr(current_visi + 2, 1, line, curses.color_pair(3)) + current_visi += 1 + + char = stdscr.getch() + if char == ord("j") or char == curses.KEY_DOWN: + sel = (sel + 1) % (len(lines) - 1) + elif char == ord("k") or char == curses.KEY_UP: + sel = (sel - 1) % (len(lines) - 1) + elif char == ord("g"): + sel = 0 + elif char == ord("G"): + sel = len(lines) - 2 + elif char == ord("q"): + break + elif char == ord("d"): + dom = conn.lookupByName(vm_name_ordered_list[sel]) + dom.destroyFlags(flags=libvirt.VIR_DOMAIN_DESTROY_GRACEFUL) + elif char == ord("s"): + dom = conn.lookupByName(vm_name_ordered_list[sel]) + dom.shutdownFlags() + elif char == ord("r"): + dom = conn.lookupByName(vm_name_ordered_list[sel]) + dom.createWithFlags() + else: + pass + + stdscr.refresh() +# FIXME- the finally wipes the screen, effectively rendering the help option useless if __name__ == "__main__": - main() + stdscr = curses.initscr() + try: + asyncio.run(main()) + except Exception as e: + print(e) + finally: + do_cleanup() -- cgit v1.2.3