aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorterminaldweller <thabogre@gmail.com>2023-02-14 10:38:11 +0000
committerterminaldweller <thabogre@gmail.com>2023-02-14 10:38:11 +0000
commite71aa0e8c9392c1b4118986172f9e6552bb01c5c (patch)
treebb8b75cd4e80e300a02e5e1e56faaaf1edf3a35c
parentInitial commit (diff)
downloadvirttop-e71aa0e8c9392c1b4118986172f9e6552bb01c5c.tar.gz
virttop-e71aa0e8c9392c1b4118986172f9e6552bb01c5c.zip
initial commit
-rw-r--r--poetry.lock29
-rw-r--r--pyproject.toml31
-rwxr-xr-xvirttop/virttop.py382
3 files changed, 442 insertions, 0 deletions
diff --git a/poetry.lock b/poetry.lock
new file mode 100644
index 0000000..d7b6a32
--- /dev/null
+++ b/poetry.lock
@@ -0,0 +1,29 @@
+[[package]]
+name = "defusedxml"
+version = "0.7.1"
+description = "XML bomb protection for Python stdlib modules"
+category = "main"
+optional = false
+python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
+
+[[package]]
+name = "libvirt-python"
+version = "9.0.0"
+description = "The libvirt virtualization API python binding"
+category = "main"
+optional = false
+python-versions = "*"
+
+[metadata]
+lock-version = "1.1"
+python-versions = "^3.11"
+content-hash = "70c901717e9058eccd3cb96afaf55ad6f1d1a44c78122d35a0f35fc7d4e4abca"
+
+[metadata.files]
+defusedxml = [
+ {file = "defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61"},
+ {file = "defusedxml-0.7.1.tar.gz", hash = "sha256:1bb3032db185915b62d7c6209c5a8792be6a32ab2fedacc84e01b52c51aa3e69"},
+]
+libvirt-python = [
+ {file = "libvirt-python-9.0.0.tar.gz", hash = "sha256:49702d33fa8cbcae19fa727467a69f7ae2241b3091324085ca1cc752b2b414ce"},
+]
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..2e184b9
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,31 @@
+[tool.poetry]
+name = "virttop"
+version = "0.1.0"
+description = "A top like utility for libvirt"
+authors = ["terminaldwelelr <devi@terminaldweller.com>"]
+license = "GPL-3.0"
+readme = "README.md"
+homepage = "https://github.com/terminaldweller/virttop"
+repository = "https://github.com/terminaldweller/virttop"
+keywords = ["libvirt","top"]
+classifiers = [
+ "Environment :: Console",
+]
+include = [
+ "LICENSE",
+]
+packages = [
+{include = "virttop"}
+]
+
+[tool.poetry.scripts]
+tunneltop = "virttop.virttop:main"
+
+[tool.poetry.dependencies]
+python = "^3.11"
+libvirt-python = "^9.0.0"
+defusedxml = "^0.7.1"
+
+[build-system]
+requires = ["poetry-core"]
+build-backend = "poetry.core.masonry.api"
diff --git a/virttop/virttop.py b/virttop/virttop.py
new file mode 100755
index 0000000..0a1e4f0
--- /dev/null
+++ b/virttop/virttop.py
@@ -0,0 +1,382 @@
+#!/usr/bin/env python
+"""virt top"""
+
+# ideally we would like to use the monkeypatch but it is untested
+# and experimental
+# import defusedxml # type:ignore
+# defusedxml.defuse_stdlib()
+import argparse
+import csv
+import dataclasses
+import enum
+import os
+import signal
+import sys
+import time
+import typing
+
+# we are only using this for type annotation
+from xml.dom.minidom import Document # nosec
+
+from defusedxml import ElementTree # type:ignore
+from defusedxml import minidom
+import libvirt # type:ignore
+
+
+# pylint: disable=unused-argument
+def sig_handler_sigint(signum, frame):
+ """Just to handle C-c gracefully"""
+ print()
+ sys.exit(0)
+
+
+class Argparser: # pylint: disable=too-few-public-methods
+ """Argparser class."""
+
+ def __init__(self):
+ self.parser = argparse.ArgumentParser()
+ self.parser.add_argument(
+ "--delay",
+ "-d",
+ type=float,
+ help="The delay between updates",
+ default=5,
+ )
+ self.parser.add_argument(
+ "--uri",
+ "-u",
+ nargs="+",
+ type=str,
+ help="A list of URIs to connect to seperated by commas",
+ default=["qemu:///system"],
+ )
+ self.args = self.parser.parse_args()
+
+
+# pylint: disable=too-few-public-methods
+class Colors(enum.EnumType):
+ """static color definitions"""
+
+ purple = "\033[95m"
+ blue = "\033[94m"
+ green = "\033[92m"
+ yellow = "\033[93m"
+ red = "\033[91m"
+ grey = "\033[1;37m"
+ darkgrey = "\033[1;30m"
+ cyan = "\033[1;36m"
+ ENDC = "\033[0m"
+ BOLD = "\033[1m"
+ UNDERLINE = "\033[4m"
+ blueblue = "\x1b[38;5;24m"
+ greenie = "\x1b[38;5;23m"
+ goo = "\x1b[38;5;22m"
+ screen_clear = "\033c\033[3J"
+ hide_cursor = "\033[?25l"
+
+
+@dataclasses.dataclass
+# pylint: disable=too-many-instance-attributes
+class VirtData:
+ """Holds the data that we collect to display to the user"""
+
+ vm_id: typing.List[str] = dataclasses.field(default_factory=list)
+ name: typing.List[str] = dataclasses.field(default_factory=list)
+ cpu_times: typing.List[str] = dataclasses.field(default_factory=list)
+ mem_actual: typing.List[str] = dataclasses.field(default_factory=list)
+ mem_unused: typing.List[str] = dataclasses.field(default_factory=list)
+ write_bytes: typing.List[str] = dataclasses.field(default_factory=list)
+ read_bytes: typing.List[str] = dataclasses.field(default_factory=list)
+ macs: typing.List[str] = dataclasses.field(default_factory=list)
+ ips: typing.List[str] = dataclasses.field(default_factory=list)
+ disk_reads: typing.List[str] = dataclasses.field(default_factory=list)
+ disk_writes: typing.List[str] = dataclasses.field(default_factory=list)
+ snapshot_counts: typing.List[str] = dataclasses.field(default_factory=list)
+ uri: typing.List[str] = dataclasses.field(default_factory=list)
+ memory_pool: typing.List[str] = dataclasses.field(default_factory=list)
+
+ pools: typing.List[libvirt.virStoragePool] = dataclasses.field(
+ default_factory=list
+ )
+
+
+def get_network_info(
+ xml_doc: Document,
+) -> typing.Dict[str, str]:
+ """returns the network info"""
+ result_dict = {}
+ interface_types = xml_doc.getElementsByTagName("interface")
+ for interface_type in interface_types:
+ interface_nodes = interface_type.childNodes
+ for interface_node in interface_nodes:
+ if interface_node.nodeName[0:1] != "#":
+ for attr in interface_node.attributes.keys():
+ result_dict[
+ interface_node.attributes[attr].name
+ ] = interface_node.attributes[attr].value
+ return result_dict
+
+
+def get_arp_table() -> typing.Dict[str, str]:
+ """Get the ARP table. return a dict with MAC/IP as key/value pair."""
+ result: typing.Dict[str, str] = {}
+ with open("/proc/net/arp", encoding="utf-8") as arp_table:
+ reader = csv.reader(arp_table, skipinitialspace=True, delimiter=" ")
+ for arp_entry in reader:
+ result[arp_entry[3]] = arp_entry[0]
+
+ return result
+
+
+def get_ip_from_arp_table(arp_table: typing.Dict[str, str], mac: str) -> str:
+ """get IP from MAC address using the arp table"""
+ try:
+ return arp_table[mac]
+ except KeyError:
+ return "N/A"
+
+
+def get_disk_info(
+ xml_doc: Document,
+) -> typing.Dict[str, str]:
+ """returns the disk info"""
+ result_dict: typing.Dict = {}
+ disk_types = xml_doc.getElementsByTagName("disk")
+ for disk_type in disk_types:
+ disk_nodes = disk_type.childNodes
+ for disk_node in disk_nodes:
+ if disk_node.nodeName[0:1] != "#":
+ for attr in disk_node.attributes.keys():
+ result_dict[
+ disk_node.attributes[attr].name
+ ] = disk_node.attributes[attr].value
+
+ return result_dict
+
+
+# pylint: disable=too-many-locals
+def ffs(
+ offset: int,
+ header_list: typing.Optional[typing.List[str]],
+ numbered: bool,
+ *args,
+) -> typing.List[str]:
+ """A simple columnar printer"""
+ max_column_width = []
+ lines = []
+ numbers_f: typing.List[int] = []
+ dummy = []
+
+ if sys.stdout.isatty():
+ greenie = Colors.greenie
+ bold = Colors.BOLD
+ endc = Colors.ENDC
+ goo = Colors.goo
+ blueblue = Colors.blueblue
+ else:
+ greenie = ""
+ bold = ""
+ endc = ""
+ goo = ""
+ blueblue = ""
+
+ for arg in args:
+ max_column_width.append(max(len(repr(argette)) for argette in arg))
+
+ if header_list is not None:
+ if numbered:
+ numbers_f.extend(range(1, len(args[-1]) + 1))
+ max_column_width.append(
+ max(len(repr(number)) for number in numbers_f)
+ )
+ header_list.insert(0, "idx")
+
+ index = range(0, len(header_list))
+ for header, width, i in zip(header_list, max_column_width, index):
+ max_column_width[i] = max(len(header), width) + offset
+
+ for i in index:
+ dummy.append(
+ greenie
+ + bold
+ + header_list[i].ljust(max_column_width[i])
+ + endc
+ )
+ lines.append("".join(dummy))
+ dummy.clear()
+
+ index2 = range(0, len(args[-1]))
+ for i in index2:
+ if numbered:
+ dummy.append(
+ goo + bold + repr(i).ljust(max_column_width[0]) + endc
+ )
+ for arg, width in zip(args, max_column_width[1:]):
+ dummy.append(blueblue + (arg[i]).ljust(width) + endc)
+ else:
+ for arg, width in zip(args, max_column_width):
+ dummy.append(blueblue + (arg[i]).ljust(width) + endc)
+ lines.append("".join(dummy))
+ dummy.clear()
+ return lines
+
+
+def size_abr(num: float, shift_by: float) -> str:
+ """Rounds and abbreviates floats."""
+ num = num * shift_by
+ if num < 1000:
+ return repr(num)
+ if num < 1_000_000.0:
+ return repr(round(num / 1_000, 2)) + " KB"
+ if num < 1_000_000_000:
+ return repr(round(num / 1_000_000, 2)) + " MB"
+ if num < 1_000_000_000_000:
+ return repr(round(num / 1_000_000_000, 2)) + " GB"
+ return "N/A"
+
+
+# pylint: disable=too-many-locals
+def fill_virt_data_uri(
+ conn: libvirt.virConnect,
+ active_hosts: typing.List[int],
+ virt_data: VirtData,
+ arp_table: typing.Dict[str, str],
+) -> None:
+ """fill VirtData for one URI."""
+ for host_id in active_hosts:
+ virt_data.uri.append(conn.getURI())
+ dom = conn.lookupByID(host_id)
+ virt_data.snapshot_counts.append(repr(dom.snapshotNum()))
+ try:
+ virt_data.cpu_times.append(
+ repr(
+ int(
+ dom.getCPUStats(total=True)[0]["cpu_time"]
+ / 1_000_000_000
+ )
+ )
+ + "s"
+ )
+ except:
+ virt_data.cpu_times.append("n/a")
+ xml_doc = minidom.parseString(dom.XMLDesc())
+ virt_data.name.append(dom.name())
+
+ mem_stats = dom.memoryStats()
+ if "actual" in mem_stats:
+ virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000))
+ else:
+ virt_data.mem_actual.append("n/a")
+
+ # BSD guests dont support mem balloons?
+ try:
+ virt_data.mem_unused.append(size_abr(mem_stats["available"], 1000))
+ except KeyError:
+ virt_data.mem_unused.append("N/A")
+
+ tree = ElementTree.fromstring(dom.XMLDesc())
+ iface = tree.find("devices/interface/target").get("dev")
+ stats = dom.interfaceStats(iface)
+ virt_data.write_bytes.append(size_abr(stats[4], 1))
+ virt_data.read_bytes.append(size_abr(stats[0], 1))
+
+ found_the_pool: bool = False
+ disk = tree.find("devices/disk/source").get("file")
+ for pool in virt_data.pools:
+ if os.path.basename(disk) in pool.listVolumes():
+ virt_data.memory_pool.append(pool.name())
+ found_the_pool = True
+ # you could delete the pool but keep the volumes inside
+ # which results in a functional VM but it wont have a
+ # volume inside a pool that we can detect
+ if not found_the_pool:
+ virt_data.memory_pool.append("N/A")
+
+ disk_info = get_disk_info(xml_doc)
+ image_name = disk_info["file"]
+ _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name)
+ virt_data.disk_reads.append(size_abr(rd_bytes, 1))
+ virt_data.disk_writes.append(size_abr(wr_bytes, 1))
+
+ network_info = get_network_info(xml_doc)
+ virt_data.macs.append(network_info["address"])
+ # virt_data.ips.append(get_ip_by_arp(network_info["address"]))
+ # TODO-this is obviously not going to work for remote URIs
+ virt_data.ips.append(
+ get_ip_from_arp_table(arp_table, network_info["address"])
+ )
+
+
+def main() -> None:
+ """entrypoint"""
+ signal.signal(signal.SIGINT, sig_handler_sigint)
+ argparser = Argparser()
+ print(Colors.screen_clear, end="")
+ while True:
+ virt_data = VirtData()
+ arp_table = get_arp_table()
+ for hv_host in argparser.args.uri:
+ conn = libvirt.openReadOnly(hv_host)
+ active_hosts = conn.listDomainsID()
+ if len(active_hosts) > 0:
+ virt_data.pools = conn.listAllStoragePools()
+ # for pool in virt_data.pools:
+ # print(pool.listVolumes())
+ # networks = conn.listAllNetworks()
+ # print([pool.name() for pool in conn.listAllStoragePools()])
+ # print([net.name() for net in conn.listAllNetworks()])
+ virt_data.vm_id = [
+ repr(vm_id) for vm_id in conn.listDomainsID()
+ ]
+ fill_virt_data_uri(conn, active_hosts, virt_data, arp_table)
+ # for conn_id in conn.listAllDomains():
+ # print(conn_id.name())
+ # print(conn_id.state())
+ else:
+ print("no active VMs found.")
+ sys.exit(1)
+
+ lines = ffs(
+ 2,
+ [
+ "ID",
+ "NAME",
+ "CPU",
+ "MEM_ACTUAL",
+ "MEM_AVAIL",
+ "NET_WRITE_B",
+ "NET_READ_B",
+ "MAC",
+ "IP",
+ "IO_READ_B",
+ "IO_WRITE_B",
+ "SNAPSHOTS",
+ "URI",
+ "STORAGE_POOL",
+ ],
+ False,
+ virt_data.vm_id,
+ virt_data.name,
+ virt_data.cpu_times,
+ virt_data.mem_actual,
+ virt_data.mem_unused,
+ virt_data.write_bytes,
+ virt_data.read_bytes,
+ virt_data.macs,
+ virt_data.ips,
+ virt_data.disk_reads,
+ virt_data.disk_writes,
+ virt_data.snapshot_counts,
+ virt_data.uri,
+ virt_data.memory_pool,
+ )
+ for line in lines:
+ print(line)
+ time.sleep(argparser.args.delay)
+ # clears the screen
+ print(Colors.screen_clear, end="")
+ print(Colors.hide_cursor, end="")
+
+
+if __name__ == "__main__":
+ main()