aboutsummaryrefslogtreecommitdiffstats
path: root/bin/virttop
diff options
context:
space:
mode:
Diffstat (limited to 'bin/virttop')
-rwxr-xr-xbin/virttop241
1 files changed, 241 insertions, 0 deletions
diff --git a/bin/virttop b/bin/virttop
new file mode 100755
index 0000000..4310d5d
--- /dev/null
+++ b/bin/virttop
@@ -0,0 +1,241 @@
+#!/usr/bin/env python
+"""virt top"""
+
+# ideally we would like to use the monkeypatch but it is untested
+# and experimental
+# import defusedxml # type:ignore
+# defusedxml.defuse_stdlib()
+import argparse
+import subprocess
+import time
+import typing
+from defusedxml import ElementTree # type:ignore
+from defusedxml import minidom
+import libvirt # type:ignore
+
+hv = ["qemu:///system"]
+
+
+class Argparser: # pylint: disable=too-few-public-methods
+ """Argparser class."""
+
+ def __init__(self):
+ self.parser = argparse.ArgumentParser()
+ self.parser.add_argument(
+ "--delay",
+ "-d",
+ type=float,
+ help="the delay between updates",
+ default=5,
+ )
+ self.args = self.parser.parse_args()
+
+
+# pylint: disable=too-few-public-methods
+class Colors:
+ """static color definitions"""
+
+ purple = "\033[95m"
+ blue = "\033[94m"
+ green = "\033[92m"
+ yellow = "\033[93m"
+ red = "\033[91m"
+ grey = "\033[1;37m"
+ darkgrey = "\033[1;30m"
+ cyan = "\033[1;36m"
+ ENDC = "\033[0m"
+ BOLD = "\033[1m"
+ UNDERLINE = "\033[4m"
+ blueblue = "\x1b[38;5;24m"
+ greenie = "\x1b[38;5;23m"
+ goo = "\x1b[38;5;22m"
+
+
+def get_network_info(xml) -> typing.Dict[str, str]:
+ """returns the network info"""
+ result_dict = {}
+ interface_types = xml.getElementsByTagName("interface")
+ for interface_type in interface_types:
+ interface_nodes = interface_type.childNodes
+ for interface_node in interface_nodes:
+ if interface_node.nodeName[0:1] != "#":
+ for attr in interface_node.attributes.keys():
+ result_dict[
+ interface_node.attributes[attr].name
+ ] = interface_node.attributes[attr].value
+ return result_dict
+
+
+def get_ip_by_arp(mac: str) -> str:
+ """get ip adress using arp"""
+ # arp -n | grep xx:xx:xx:xx:xx:xx | gawk '{print $1}'
+ try:
+ proc1 = subprocess.run(["arp", "-n"], capture_output=True, check=True)
+ proc2 = subprocess.run(
+ ["grep", mac], input=proc1.stdout, capture_output=True, check=True
+ )
+ proc3 = subprocess.run(
+ ["awk", "{print $1}"],
+ input=proc2.stdout,
+ capture_output=True,
+ check=True,
+ )
+ ip_address = proc3.stdout.decode("utf-8").strip("\n")
+ except:
+ ip_address = "N/A"
+ return ip_address
+
+
+def get_disk_info(xml) -> typing.Dict[str, str]:
+ """returns the disk info"""
+ result_dict: typing.Dict = {}
+ disk_types = xml.getElementsByTagName("disk")
+ for disk_type in disk_types:
+ disk_nodes = disk_type.childNodes
+ for disk_node in disk_nodes:
+ if disk_node.nodeName[0:1] != "#":
+ for attr in disk_node.attributes.keys():
+ result_dict[
+ disk_node.attributes[attr].name
+ ] = disk_node.attributes[attr].value
+
+ return result_dict
+
+
+def ffs(offset, header_list, numbered, *args):
+ """A simple columnar printer"""
+ max_column_width = []
+ lines = []
+ numbers_f = []
+ dummy = []
+
+ if numbered:
+ numbers_f.extend(range(1, len(args[-1]) + 1))
+ max_column_width.append(
+ max([len(repr(number)) for number in numbers_f])
+ )
+ header_list.insert(0, "idx")
+
+ for arg in args:
+ max_column_width.append(max([len(repr(argette)) for argette in arg]))
+
+ index = range(0, len(header_list))
+ for header, width, i in zip(header_list, max_column_width, index):
+ max_column_width[i] = max(len(header), width) + offset
+
+ for i in index:
+ dummy.append(
+ Colors.greenie
+ + Colors.BOLD
+ + header_list[i].ljust(max_column_width[i])
+ + Colors.ENDC
+ )
+ lines.append("".join(dummy))
+ dummy.clear()
+
+ index2 = range(0, len(args[-1]))
+ for i in index2:
+ if numbered:
+ dummy.append(
+ Colors.goo
+ + Colors.BOLD
+ + repr(i).ljust(max_column_width[0])
+ + Colors.ENDC
+ )
+ for arg, width in zip(args, max_column_width[1:]):
+ dummy.append(
+ Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC
+ )
+ else:
+ for arg, width in zip(args, max_column_width):
+ dummy.append(
+ Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC
+ )
+ lines.append("".join(dummy))
+ dummy.clear()
+ return lines
+
+
+def size_abr(num: float, shift_by: float) -> str:
+ num = num * shift_by
+ if num < 1000:
+ return repr(num)
+ elif num < 1_000_000.0:
+ return repr(num / 1_000) + " KB"
+ elif num < 1_000_000_000:
+ return repr(num / 1_000_000) + " MB"
+ elif num < 1_000_000_000_000:
+ return repr(num / 1_000_000_000) + " GB"
+ return "N/A"
+
+
+def main() -> None:
+ """entrypoint"""
+ # argparser = Argparser()
+ name = []
+ cpu_times = []
+ mem_actual = []
+ mem_unused = []
+ write_bytes = []
+ read_bytes = []
+ macs = []
+ ips = []
+ for hv_host in hv:
+ conn = libvirt.openReadOnly(hv_host)
+ active_hosts = conn.listDomainsID()
+ for host_id in active_hosts:
+ dom = conn.lookupByID(host_id)
+ cpu_times.append(
+ repr(
+ int(
+ dom.getCPUStats(total=True)[0]["cpu_time"]
+ / 1_000_000_000
+ )
+ )
+ + " s"
+ )
+ raw_xml = dom.XMLDesc()
+ xml = minidom.parseString(raw_xml)
+ name.append(dom.name())
+ mem_stats = dom.memoryStats()
+ mem_actual.append(size_abr(mem_stats["actual"], 1000))
+ mem_unused.append(size_abr(mem_stats["unused"], 1000))
+ tree = ElementTree.fromstring(dom.XMLDesc())
+ iface = tree.find("devices/interface/target").get("dev")
+ stats = dom.interfaceStats(iface)
+ write_bytes.append(size_abr(stats[4], 1))
+ read_bytes.append(size_abr(stats[0], 1))
+ get_disk_info(xml)
+ network_info = get_network_info(xml)
+ macs.append(network_info["address"])
+ ips.append(get_ip_by_arp(network_info["address"]))
+
+ lines = ffs(
+ 3,
+ [
+ "name",
+ "cpu",
+ "mem_actual",
+ "mem_unused",
+ "write_B",
+ "read_B",
+ "MAC",
+ "IP",
+ ],
+ True,
+ name,
+ cpu_times,
+ mem_actual,
+ mem_unused,
+ write_bytes,
+ read_bytes,
+ macs,
+ ips,
+ )
+ for line in lines:
+ print(line)
+ # time.sleep(argparser.args.delay)
+
+
+if __name__ == "__main__":
+ main()