From e0a9a4c74fd973291c5b726ae969714b86491c3f Mon Sep 17 00:00:00 2001 From: terminaldweller Date: Mon, 26 Dec 2022 11:55:09 +0330 Subject: update --- bin/virttop | 221 ++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 133 insertions(+), 88 deletions(-) (limited to 'bin') diff --git a/bin/virttop b/bin/virttop index 4310d5d..ef62c20 100755 --- a/bin/virttop +++ b/bin/virttop @@ -6,15 +6,16 @@ # import defusedxml # type:ignore # defusedxml.defuse_stdlib() import argparse +import dataclasses import subprocess -import time + +# import time import typing +from xml.dom.minidom import Document from defusedxml import ElementTree # type:ignore from defusedxml import minidom import libvirt # type:ignore -hv = ["qemu:///system"] - class Argparser: # pylint: disable=too-few-public-methods """Argparser class.""" @@ -25,9 +26,17 @@ class Argparser: # pylint: disable=too-few-public-methods "--delay", "-d", type=float, - help="the delay between updates", + help="The delay between updates", default=5, ) + self.parser.add_argument( + "--uri", + "-u", + nargs="+", + type=str, + help="A list of URIs to connect to seperated by commas", + default=["qemu:///system"], + ) self.args = self.parser.parse_args() @@ -51,10 +60,29 @@ class Colors: goo = "\x1b[38;5;22m" -def get_network_info(xml) -> typing.Dict[str, str]: +@dataclasses.dataclass +# pylint: disable=too-many-instance-attributes +class VirtData: + """Holds the data that we collect to display to the user""" + + name: typing.List[str] = dataclasses.field(default_factory=list) + cpu_times: typing.List[str] = dataclasses.field(default_factory=list) + mem_actual: typing.List[str] = dataclasses.field(default_factory=list) + mem_unused: typing.List[str] = dataclasses.field(default_factory=list) + write_bytes: typing.List[str] = dataclasses.field(default_factory=list) + read_bytes: typing.List[str] = dataclasses.field(default_factory=list) + macs: typing.List[str] = dataclasses.field(default_factory=list) + ips: typing.List[str] = dataclasses.field(default_factory=list) + disk_reads: typing.List[str] = dataclasses.field(default_factory=list) + disk_writes: typing.List[str] = dataclasses.field(default_factory=list) + + +def get_network_info( + xml_doc: Document, +) -> typing.Dict[str, str]: """returns the network info""" result_dict = {} - interface_types = xml.getElementsByTagName("interface") + interface_types = xml_doc.getElementsByTagName("interface") for interface_type in interface_types: interface_nodes = interface_type.childNodes for interface_node in interface_nodes: @@ -67,29 +95,41 @@ def get_network_info(xml) -> typing.Dict[str, str]: def get_ip_by_arp(mac: str) -> str: - """get ip adress using arp""" + """Get ip adress using the arp table.""" # arp -n | grep xx:xx:xx:xx:xx:xx | gawk '{print $1}' try: - proc1 = subprocess.run(["arp", "-n"], capture_output=True, check=True) + proc1 = subprocess.run( + ["/usr/bin/arp", "-n"], + capture_output=True, + check=True, + shell=False, + ) proc2 = subprocess.run( - ["grep", mac], input=proc1.stdout, capture_output=True, check=True + ["/usr/bin/grep", mac], + input=proc1.stdout, + capture_output=True, + check=True, + shell=False, ) proc3 = subprocess.run( - ["awk", "{print $1}"], + ["/usr/bin/awk", "{print $1}"], input=proc2.stdout, capture_output=True, check=True, + shell=False, ) ip_address = proc3.stdout.decode("utf-8").strip("\n") - except: + except subprocess.CalledProcessError: ip_address = "N/A" return ip_address -def get_disk_info(xml) -> typing.Dict[str, str]: +def get_disk_info( + xml_doc: Document, +) -> typing.Dict[str, str]: """returns the disk info""" result_dict: typing.Dict = {} - disk_types = xml.getElementsByTagName("disk") + disk_types = xml_doc.getElementsByTagName("disk") for disk_type in disk_types: disk_nodes = disk_type.childNodes for disk_node in disk_nodes: @@ -102,22 +142,20 @@ def get_disk_info(xml) -> typing.Dict[str, str]: return result_dict -def ffs(offset, header_list, numbered, *args): +def ffs(offset: int, header_list: typing.List[str], numbered: bool, *args): """A simple columnar printer""" max_column_width = [] lines = [] - numbers_f = [] + numbers_f: typing.List[int] = [] dummy = [] if numbered: numbers_f.extend(range(1, len(args[-1]) + 1)) - max_column_width.append( - max([len(repr(number)) for number in numbers_f]) - ) + max_column_width.append(max(len(repr(number)) for number in numbers_f)) header_list.insert(0, "idx") for arg in args: - max_column_width.append(max([len(repr(argette)) for argette in arg])) + max_column_width.append(max(len(repr(argette)) for argette in arg)) index = range(0, len(header_list)) for header, width, i in zip(header_list, max_column_width, index): @@ -144,12 +182,12 @@ def ffs(offset, header_list, numbered, *args): ) for arg, width in zip(args, max_column_width[1:]): dummy.append( - Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC + Colors.blueblue + (arg[i]).ljust(width) + Colors.ENDC ) else: for arg, width in zip(args, max_column_width): dummy.append( - Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC + Colors.blueblue + (arg[i]).ljust(width) + Colors.ENDC ) lines.append("".join(dummy)) dummy.clear() @@ -157,83 +195,90 @@ def ffs(offset, header_list, numbered, *args): def size_abr(num: float, shift_by: float) -> str: + """Rounds and abbreviates floats.""" num = num * shift_by if num < 1000: return repr(num) - elif num < 1_000_000.0: - return repr(num / 1_000) + " KB" - elif num < 1_000_000_000: - return repr(num / 1_000_000) + " MB" - elif num < 1_000_000_000_000: - return repr(num / 1_000_000_000) + " GB" + if num < 1_000_000.0: + return repr(round(num / 1_000, 2)) + " KB" + if num < 1_000_000_000: + return repr(round(num / 1_000_000, 2)) + " MB" + if num < 1_000_000_000_000: + return repr(round(num / 1_000_000_000, 2)) + " GB" return "N/A" +def fill_virt_data_uri(conn, active_hosts, virt_data: VirtData) -> None: + """fill VirtData for one URI.""" + for host_id in active_hosts: + dom = conn.lookupByID(host_id) + virt_data.cpu_times.append( + repr( + int(dom.getCPUStats(total=True)[0]["cpu_time"] / 1_000_000_000) + ) + + "s" + ) + xml_doc = minidom.parseString(dom.XMLDesc()) + virt_data.name.append(dom.name()) + + mem_stats = dom.memoryStats() + virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000)) + virt_data.mem_unused.append(size_abr(mem_stats["unused"], 1000)) + + tree = ElementTree.fromstring(dom.XMLDesc()) + iface = tree.find("devices/interface/target").get("dev") + stats = dom.interfaceStats(iface) + virt_data.write_bytes.append(size_abr(stats[4], 1)) + virt_data.read_bytes.append(size_abr(stats[0], 1)) + + disk_info = get_disk_info(xml_doc) + image_name = disk_info["file"] + _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name) + virt_data.disk_reads.append(size_abr(rd_bytes, 1)) + virt_data.disk_writes.append(size_abr(wr_bytes, 1)) + + network_info = get_network_info(xml_doc) + virt_data.macs.append(network_info["address"]) + virt_data.ips.append(get_ip_by_arp(network_info["address"])) + + def main() -> None: """entrypoint""" - # argparser = Argparser() - name = [] - cpu_times = [] - mem_actual = [] - mem_unused = [] - write_bytes = [] - read_bytes = [] - macs = [] - ips = [] - for hv_host in hv: + argparser = Argparser() + virt_data = VirtData() + for hv_host in argparser.args.uri: conn = libvirt.openReadOnly(hv_host) active_hosts = conn.listDomainsID() - for host_id in active_hosts: - dom = conn.lookupByID(host_id) - cpu_times.append( - repr( - int( - dom.getCPUStats(total=True)[0]["cpu_time"] - / 1_000_000_000 - ) - ) - + " s" - ) - raw_xml = dom.XMLDesc() - xml = minidom.parseString(raw_xml) - name.append(dom.name()) - mem_stats = dom.memoryStats() - mem_actual.append(size_abr(mem_stats["actual"], 1000)) - mem_unused.append(size_abr(mem_stats["unused"], 1000)) - tree = ElementTree.fromstring(dom.XMLDesc()) - iface = tree.find("devices/interface/target").get("dev") - stats = dom.interfaceStats(iface) - write_bytes.append(size_abr(stats[4], 1)) - read_bytes.append(size_abr(stats[0], 1)) - get_disk_info(xml) - network_info = get_network_info(xml) - macs.append(network_info["address"]) - ips.append(get_ip_by_arp(network_info["address"])) - - lines = ffs( - 3, - [ - "name", - "cpu", - "mem_actual", - "mem_unused", - "write_B", - "read_B", - "MAC", - "IP", - ], - True, - name, - cpu_times, - mem_actual, - mem_unused, - write_bytes, - read_bytes, - macs, - ips, - ) - for line in lines: - print(line) + fill_virt_data_uri(conn, active_hosts, virt_data) + + lines = ffs( + 3, + [ + "name", + "cpu", + "mem_actual", + "mem_unused", + "net_write_B", + "net_read_B", + "MAC", + "IP", + "IO_read_B", + "IO_write_B", + ], + True, + virt_data.name, + virt_data.cpu_times, + virt_data.mem_actual, + virt_data.mem_unused, + virt_data.write_bytes, + virt_data.read_bytes, + virt_data.macs, + virt_data.ips, + virt_data.disk_reads, + virt_data.disk_writes, + ) + for line in lines: + print(line) # time.sleep(argparser.args.delay) -- cgit v1.2.3