aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README.md34
-rw-r--r--pyproject.toml2
-rw-r--r--virttop.pngbin0 -> 57124 bytes
-rwxr-xr-xvirttop/virttop.py464
4 files changed, 350 insertions, 150 deletions
diff --git a/README.md b/README.md
index edd5efe..15649cd 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,36 @@
# virttop
a top like utility for libvirt
+
+
+![Image](virttop.png)
+
+## How to get
+```sh
+pip install virttop
+```
+
+## Configfile
+The default location for the config file is '~/.virttop.toml'.
+
+```toml
+[color]
+name_column_fg=23
+name_column_bg=0
+active_row_fg=24
+active_row_bg=0
+inactive_row_fg=244
+inactive_row_bg=0
+box_fg=29
+box_bg=0
+selected_fg=0
+selected_bg=36
+```
+
+## Keybindings
+
+`j` and `k` and arrow keys move up and down.</br>
+`g` moves to the top of the list.</br>
+`G` moved to the bottom of the list.</br>
+`r` runs an inactive domain.</br>
+`s` shuts down a running domain.</br>
+`d` destroy a running domain.</br>
diff --git a/pyproject.toml b/pyproject.toml
index e2c870b..1f4a7b6 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "virttop"
-version = "0.1.1"
+version = "0.2.0"
description = "A top like utility for libvirt"
authors = ["terminaldweller <devi@terminaldweller.com>"]
license = "GPL-3.0"
diff --git a/virttop.png b/virttop.png
new file mode 100644
index 0000000..57637d2
--- /dev/null
+++ b/virttop.png
Binary files differ
diff --git a/virttop/virttop.py b/virttop/virttop.py
index 0a1e4f0..99f2b61 100755
--- a/virttop/virttop.py
+++ b/virttop/virttop.py
@@ -1,14 +1,15 @@
#!/usr/bin/env python
-"""virt top"""
+"""virttop"""
# ideally we would like to use the monkeypatch but it is untested
# and experimental
# import defusedxml # type:ignore
# defusedxml.defuse_stdlib()
import argparse
+import asyncio
import csv
+import curses
import dataclasses
-import enum
import os
import signal
import sys
@@ -21,12 +22,32 @@ from xml.dom.minidom import Document # nosec
from defusedxml import ElementTree # type:ignore
from defusedxml import minidom
import libvirt # type:ignore
+import tomllib
+
+
+def request_cred(credentials, sasl_user, sasl_pass):
+ """Credential handler."""
+ for credential in credentials:
+ if credential[0] == libvirt.VIR_CRED_AUTHNAME:
+ credential[4] = sasl_user
+ elif credential[0] == libvirt.VIR_CRED_PASSPHRASE:
+ credential[4] = sasl_pass
+ return 0
+
+
+def do_cleanup():
+ """Return the terminal to a sane state."""
+ curses.nocbreak()
+ stdscr.keypad(False)
+ curses.echo()
+ curses.endwin()
+ print()
# pylint: disable=unused-argument
def sig_handler_sigint(signum, frame):
- """Just to handle C-c gracefully"""
- print()
+ """Just to handle C-c cleanly"""
+ do_cleanup()
sys.exit(0)
@@ -36,13 +57,6 @@ class Argparser: # pylint: disable=too-few-public-methods
def __init__(self):
self.parser = argparse.ArgumentParser()
self.parser.add_argument(
- "--delay",
- "-d",
- type=float,
- help="The delay between updates",
- default=5,
- )
- self.parser.add_argument(
"--uri",
"-u",
nargs="+",
@@ -50,29 +64,29 @@ class Argparser: # pylint: disable=too-few-public-methods
help="A list of URIs to connect to seperated by commas",
default=["qemu:///system"],
)
- self.args = self.parser.parse_args()
-
+ self.parser.add_argument(
+ "--config",
+ "-c",
+ type=str,
+ help="Path to the config file",
+ default="~/.virttop.toml",
+ )
+ self.parser.add_argument(
+ "--active",
+ "-a",
+ type=bool,
+ help="Show active VMs only",
+ default=False,
+ )
+ self.parser.add_argument(
+ "--debug",
+ "-d",
+ type=bool,
+ help="Turn on debug logging",
+ default=False,
+ )
-# pylint: disable=too-few-public-methods
-class Colors(enum.EnumType):
- """static color definitions"""
-
- purple = "\033[95m"
- blue = "\033[94m"
- green = "\033[92m"
- yellow = "\033[93m"
- red = "\033[91m"
- grey = "\033[1;37m"
- darkgrey = "\033[1;30m"
- cyan = "\033[1;36m"
- ENDC = "\033[0m"
- BOLD = "\033[1m"
- UNDERLINE = "\033[4m"
- blueblue = "\x1b[38;5;24m"
- greenie = "\x1b[38;5;23m"
- goo = "\x1b[38;5;22m"
- screen_clear = "\033c\033[3J"
- hide_cursor = "\033[?25l"
+ self.args = self.parser.parse_args()
@dataclasses.dataclass
@@ -95,9 +109,16 @@ class VirtData:
uri: typing.List[str] = dataclasses.field(default_factory=list)
memory_pool: typing.List[str] = dataclasses.field(default_factory=list)
- pools: typing.List[libvirt.virStoragePool] = dataclasses.field(
- default_factory=list
- )
+ pools: typing.List[libvirt.virStoragePool] = dataclasses.field(default_factory=list)
+
+
+@dataclasses.dataclass
+class ConfigData:
+ """Holds the config data"""
+
+ sasl_user: str = dataclasses.field(default_factory=str)
+ sasl_password: str = dataclasses.field(default_factory=str)
+ color: typing.Dict[str, int] = dataclasses.field(default_factory=dict)
def get_network_info(
@@ -147,14 +168,15 @@ def get_disk_info(
for disk_node in disk_nodes:
if disk_node.nodeName[0:1] != "#":
for attr in disk_node.attributes.keys():
- result_dict[
- disk_node.attributes[attr].name
- ] = disk_node.attributes[attr].value
+ result_dict[disk_node.attributes[attr].name] = disk_node.attributes[
+ attr
+ ].value
return result_dict
# pylint: disable=too-many-locals
+# pylint: disable=too-many-branches
def ffs(
offset: int,
header_list: typing.Optional[typing.List[str]],
@@ -167,28 +189,13 @@ def ffs(
numbers_f: typing.List[int] = []
dummy = []
- if sys.stdout.isatty():
- greenie = Colors.greenie
- bold = Colors.BOLD
- endc = Colors.ENDC
- goo = Colors.goo
- blueblue = Colors.blueblue
- else:
- greenie = ""
- bold = ""
- endc = ""
- goo = ""
- blueblue = ""
-
for arg in args:
max_column_width.append(max(len(repr(argette)) for argette in arg))
if header_list is not None:
if numbered:
numbers_f.extend(range(1, len(args[-1]) + 1))
- max_column_width.append(
- max(len(repr(number)) for number in numbers_f)
- )
+ max_column_width.append(max(len(repr(number)) for number in numbers_f))
header_list.insert(0, "idx")
index = range(0, len(header_list))
@@ -196,26 +203,28 @@ def ffs(
max_column_width[i] = max(len(header), width) + offset
for i in index:
- dummy.append(
- greenie
- + bold
- + header_list[i].ljust(max_column_width[i])
- + endc
- )
+ dummy.append(header_list[i].ljust(max_column_width[i]))
lines.append("".join(dummy))
dummy.clear()
index2 = range(0, len(args[-1]))
+ active_count: int = 1
for i in index2:
if numbered:
- dummy.append(
- goo + bold + repr(i).ljust(max_column_width[0]) + endc
- )
+ if int(args[0][i]) >= 0:
+ active_count += 1
+ dummy.append(repr(i).ljust(max_column_width[0]))
for arg, width in zip(args, max_column_width[1:]):
- dummy.append(blueblue + (arg[i]).ljust(width) + endc)
+ if int(args[0][i]) >= 0:
+ dummy.append((arg[i]).ljust(width))
+ else:
+ dummy.append((arg[i]).ljust(width))
else:
for arg, width in zip(args, max_column_width):
- dummy.append(blueblue + (arg[i]).ljust(width) + endc)
+ if int(args[0][i]) >= 0:
+ dummy.append((arg[i]).ljust(width))
+ else:
+ dummy.append((arg[i]).ljust(width))
lines.append("".join(dummy))
dummy.clear()
return lines
@@ -238,103 +247,193 @@ def size_abr(num: float, shift_by: float) -> str:
# pylint: disable=too-many-locals
def fill_virt_data_uri(
conn: libvirt.virConnect,
- active_hosts: typing.List[int],
+ hosts: typing.List[libvirt.virDomain],
virt_data: VirtData,
arp_table: typing.Dict[str, str],
+ active_only: bool,
) -> None:
"""fill VirtData for one URI."""
- for host_id in active_hosts:
- virt_data.uri.append(conn.getURI())
- dom = conn.lookupByID(host_id)
- virt_data.snapshot_counts.append(repr(dom.snapshotNum()))
+ for host in hosts:
try:
- virt_data.cpu_times.append(
- repr(
- int(
- dom.getCPUStats(total=True)[0]["cpu_time"]
- / 1_000_000_000
+ if active_only and host.ID() <= 0:
+ continue
+ virt_data.vm_id.append(repr(host.ID()))
+ virt_data.uri.append(conn.getURI())
+ virt_data.name.append(host.name())
+ dom = conn.lookupByName(host.name())
+
+ virt_data.snapshot_counts.append(repr(dom.snapshotNum()))
+ if host.ID() > 0:
+ try:
+ virt_data.cpu_times.append(
+ repr(
+ int(
+ dom.getCPUStats(total=True)[0]["cpu_time"]
+ / 1_000_000_000
+ )
+ )
+ + "s"
)
+ except:
+ virt_data.cpu_times.append("n/a")
+ else:
+ virt_data.cpu_times.append("-")
+
+ xml_doc = minidom.parseString(dom.XMLDesc())
+
+ if host.ID() >= 0:
+ mem_stats = dom.memoryStats()
+ if "actual" in mem_stats:
+ virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000))
+ else:
+ virt_data.mem_actual.append("n/a")
+
+ try:
+ virt_data.mem_unused.append(size_abr(mem_stats["available"], 1000))
+ except KeyError:
+ virt_data.mem_unused.append("N/A")
+ else:
+ virt_data.mem_actual.append("-")
+ virt_data.mem_unused.append("-")
+
+ tree = ElementTree.fromstring(dom.XMLDesc())
+ if host.ID() >= 0:
+ iface = tree.find("devices/interface/target").get("dev")
+ stats = dom.interfaceStats(iface)
+ virt_data.write_bytes.append(size_abr(stats[4], 1))
+ virt_data.read_bytes.append(size_abr(stats[0], 1))
+ else:
+ virt_data.write_bytes.append("-")
+ virt_data.read_bytes.append("-")
+
+ found_the_pool: bool = False
+ disk = tree.find("devices/disk/source").get("file")
+ for pool in virt_data.pools:
+ if os.path.basename(disk) in pool.listVolumes():
+ virt_data.memory_pool.append(pool.name())
+ found_the_pool = True
+ # you could delete the pool but keep the volumes inside
+ # which results in a functional VM but it wont have a
+ # volume inside a pool that we can detect
+ if not found_the_pool:
+ virt_data.memory_pool.append("N/A")
+
+ disk_info = get_disk_info(xml_doc)
+ image_name = disk_info["file"]
+ if host.ID() >= 0:
+ _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name)
+ virt_data.disk_reads.append(size_abr(rd_bytes, 1))
+ virt_data.disk_writes.append(size_abr(wr_bytes, 1))
+
+ network_info = get_network_info(xml_doc)
+ virt_data.macs.append(network_info["address"])
+ # TODO-this is obviously not going to work for remote URIs
+ virt_data.ips.append(
+ get_ip_from_arp_table(arp_table, network_info["address"])
)
- + "s"
- )
+ else:
+ virt_data.disk_reads.append("-")
+ virt_data.disk_writes.append("-")
+ virt_data.macs.append("-")
+ virt_data.ips.append("-")
except:
- virt_data.cpu_times.append("n/a")
- xml_doc = minidom.parseString(dom.XMLDesc())
- virt_data.name.append(dom.name())
+ pass
- mem_stats = dom.memoryStats()
- if "actual" in mem_stats:
- virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000))
- else:
- virt_data.mem_actual.append("n/a")
- # BSD guests dont support mem balloons?
- try:
- virt_data.mem_unused.append(size_abr(mem_stats["available"], 1000))
- except KeyError:
- virt_data.mem_unused.append("N/A")
-
- tree = ElementTree.fromstring(dom.XMLDesc())
- iface = tree.find("devices/interface/target").get("dev")
- stats = dom.interfaceStats(iface)
- virt_data.write_bytes.append(size_abr(stats[4], 1))
- virt_data.read_bytes.append(size_abr(stats[0], 1))
-
- found_the_pool: bool = False
- disk = tree.find("devices/disk/source").get("file")
- for pool in virt_data.pools:
- if os.path.basename(disk) in pool.listVolumes():
- virt_data.memory_pool.append(pool.name())
- found_the_pool = True
- # you could delete the pool but keep the volumes inside
- # which results in a functional VM but it wont have a
- # volume inside a pool that we can detect
- if not found_the_pool:
- virt_data.memory_pool.append("N/A")
-
- disk_info = get_disk_info(xml_doc)
- image_name = disk_info["file"]
- _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name)
- virt_data.disk_reads.append(size_abr(rd_bytes, 1))
- virt_data.disk_writes.append(size_abr(wr_bytes, 1))
-
- network_info = get_network_info(xml_doc)
- virt_data.macs.append(network_info["address"])
- # virt_data.ips.append(get_ip_by_arp(network_info["address"]))
- # TODO-this is obviously not going to work for remote URIs
- virt_data.ips.append(
- get_ip_from_arp_table(arp_table, network_info["address"])
- )
+def read_config(config_path) -> ConfigData:
+ """read the config"""
+ config_data = ConfigData()
+ try:
+ with open(os.path.expanduser(config_path), "rb") as conf_file:
+ data = tomllib.load(conf_file)
+ for key, value in data.items():
+ match key:
+ case "sasl_user":
+ config_data.sasl_user = value
+ case "sasl_password":
+ config_data.sasl_password = value
+ case "color":
+ for key, value in value.items():
+ config_data.color[key] = value
+ case _:
+ print(f"warning: unknown key, {key}, found.")
+ except FileNotFoundError:
+ pass
+
+ return config_data
+
+
+def curses_init() -> None:
+ """Initialize ncurses."""
+ curses.start_color()
+ curses.use_default_colors()
+ curses.curs_set(False)
+ curses.noecho()
+ curses.cbreak()
+ stdscr.keypad(True)
+ curses.halfdelay(4)
+
+
+def init_color_pairs(config_data: ConfigData) -> None:
+ """Initialize the curses color pairs."""
+ curses.init_pair(
+ 1, config_data.color["name_column_fg"], config_data.color["name_column_bg"]
+ )
+ curses.init_pair(
+ 2, config_data.color["active_row_fg"], config_data.color["active_row_bg"]
+ )
+ curses.init_pair(
+ 3, config_data.color["inactive_row_fg"], config_data.color["inactive_row_bg"]
+ )
+ curses.init_pair(4, config_data.color["box_fg"], config_data.color["box_bg"])
+ curses.init_pair(
+ 5, config_data.color["selected_fg"], config_data.color["selected_bg"]
+ )
+
+def get_visible_rows(max_rows: int, sel: int) -> typing.Tuple[int, int]:
+ """Returns the range of columns that will be visible based on max_rows."""
+ win_min_row = sel + 2 - int(max_rows / 2)
+ win_min_row = max(win_min_row, 0)
+ win_max_row = sel + 2 + int(max_rows / 2)
+ win_max_row = max(win_max_row, max_rows)
+ return win_min_row, win_max_row
-def main() -> None:
+
+async def main() -> None:
"""entrypoint"""
+ sel: int = 0
+ current_row: int = 0
+ current_visi: int = 0
+ vm_name_ordered_list: typing.List[str] = []
+
+ curses_init()
signal.signal(signal.SIGINT, sig_handler_sigint)
argparser = Argparser()
- print(Colors.screen_clear, end="")
+ config_data = read_config(argparser.args.config)
+ arp_table = get_arp_table()
+ init_color_pairs(config_data)
while True:
+ stdscr.clear()
virt_data = VirtData()
- arp_table = get_arp_table()
for hv_host in argparser.args.uri:
- conn = libvirt.openReadOnly(hv_host)
- active_hosts = conn.listDomainsID()
- if len(active_hosts) > 0:
+ auth = [
+ [libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE],
+ request_cred,
+ None,
+ ]
+ # conn = libvirt.openAuth(hv_host, auth, libvirt.VIR_CONNECT_RO)
+ conn = libvirt.openAuth(hv_host, auth, 0)
+ hosts = conn.listAllDomains()
+ if len(hosts) > 0:
virt_data.pools = conn.listAllStoragePools()
- # for pool in virt_data.pools:
- # print(pool.listVolumes())
- # networks = conn.listAllNetworks()
- # print([pool.name() for pool in conn.listAllStoragePools()])
- # print([net.name() for net in conn.listAllNetworks()])
- virt_data.vm_id = [
- repr(vm_id) for vm_id in conn.listDomainsID()
- ]
- fill_virt_data_uri(conn, active_hosts, virt_data, arp_table)
- # for conn_id in conn.listAllDomains():
- # print(conn_id.name())
- # print(conn_id.state())
+ fill_virt_data_uri(
+ conn, hosts, virt_data, arp_table, argparser.args.active
+ )
else:
print("no active VMs found.")
- sys.exit(1)
+ time.sleep(3)
+ continue
lines = ffs(
2,
@@ -370,13 +469,80 @@ def main() -> None:
virt_data.uri,
virt_data.memory_pool,
)
- for line in lines:
- print(line)
- time.sleep(argparser.args.delay)
- # clears the screen
- print(Colors.screen_clear, end="")
- print(Colors.hide_cursor, end="")
+ stdscr.attron(curses.color_pair(4))
+ stdscr.box()
+ stdscr.attroff(curses.color_pair(4))
+ stdscr.addstr(1, 1, lines[0], curses.color_pair(1))
+
+ max_rows, _ = stdscr.getmaxyx()
+
+ win_min_row, win_max_row = get_visible_rows(max_rows - 3, sel)
+
+ current_row = 0
+ current_visi = 0
+ active_ids: typing.List[int] = []
+ for count, (line, vm_id, vm_name) in enumerate(
+ zip(lines[1:], virt_data.vm_id, virt_data.name)
+ ):
+ if int(vm_id) >= 0:
+ vm_name_ordered_list.append(vm_name)
+ active_ids.append(count)
+ current_row += 1
+ if current_row > win_max_row or current_row <= win_min_row:
+ continue
+ if sel == current_row - 1:
+ stdscr.addstr(current_visi + 2, 1, line, curses.color_pair(5))
+ else:
+ stdscr.addstr(current_visi + 2, 1, line, curses.color_pair(2))
+ current_visi += 1
+
+ inactive_count: int = len(active_ids)
+ if not argparser.args.active:
+ for count, (line, vm_name) in enumerate(zip(lines[1:], virt_data.name)):
+ if count not in active_ids:
+ vm_name_ordered_list.append(vm_name)
+ inactive_count += 1
+ current_row += 1
+ if current_row >= win_max_row or current_row < win_min_row:
+ continue
+ if sel == current_row - 1:
+ stdscr.addstr(current_visi + 2, 1, line, curses.color_pair(5))
+ else:
+ stdscr.addstr(current_visi + 2, 1, line, curses.color_pair(3))
+ current_visi += 1
+
+ char = stdscr.getch()
+ if char == ord("j") or char == curses.KEY_DOWN:
+ sel = (sel + 1) % (len(lines) - 1)
+ elif char == ord("k") or char == curses.KEY_UP:
+ sel = (sel - 1) % (len(lines) - 1)
+ elif char == ord("g"):
+ sel = 0
+ elif char == ord("G"):
+ sel = len(lines) - 2
+ elif char == ord("q"):
+ break
+ elif char == ord("d"):
+ dom = conn.lookupByName(vm_name_ordered_list[sel])
+ dom.destroyFlags(flags=libvirt.VIR_DOMAIN_DESTROY_GRACEFUL)
+ elif char == ord("s"):
+ dom = conn.lookupByName(vm_name_ordered_list[sel])
+ dom.shutdownFlags()
+ elif char == ord("r"):
+ dom = conn.lookupByName(vm_name_ordered_list[sel])
+ dom.createWithFlags()
+ else:
+ pass
+
+ stdscr.refresh()
+# FIXME- the finally wipes the screen, effectively rendering the help option useless
if __name__ == "__main__":
- main()
+ stdscr = curses.initscr()
+ try:
+ asyncio.run(main())
+ except Exception as e:
+ print(e)
+ finally:
+ do_cleanup()