#!/usr/bin/env python
"""virt top"""
# ideally we would like to use the monkeypatch but it is untested
# and experimental
# import defusedxml # type:ignore
# defusedxml.defuse_stdlib()
import argparse
import subprocess
import time
import typing
from defusedxml import ElementTree # type:ignore
from defusedxml import minidom
import libvirt # type:ignore
hv = ["qemu:///system"]
class Argparser: # pylint: disable=too-few-public-methods
"""Argparser class."""
def __init__(self):
self.parser = argparse.ArgumentParser()
self.parser.add_argument(
"--delay",
"-d",
type=float,
help="the delay between updates",
default=5,
)
self.args = self.parser.parse_args()
# pylint: disable=too-few-public-methods
class Colors:
"""static color definitions"""
purple = "\033[95m"
blue = "\033[94m"
green = "\033[92m"
yellow = "\033[93m"
red = "\033[91m"
grey = "\033[1;37m"
darkgrey = "\033[1;30m"
cyan = "\033[1;36m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
blueblue = "\x1b[38;5;24m"
greenie = "\x1b[38;5;23m"
goo = "\x1b[38;5;22m"
def get_network_info(xml) -> typing.Dict[str, str]:
"""returns the network info"""
result_dict = {}
interface_types = xml.getElementsByTagName("interface")
for interface_type in interface_types:
interface_nodes = interface_type.childNodes
for interface_node in interface_nodes:
if interface_node.nodeName[0:1] != "#":
for attr in interface_node.attributes.keys():
result_dict[
interface_node.attributes[attr].name
] = interface_node.attributes[attr].value
return result_dict
def get_ip_by_arp(mac: str) -> str:
"""get ip adress using arp"""
# arp -n | grep xx:xx:xx:xx:xx:xx | gawk '{print $1}'
try:
proc1 = subprocess.run(["arp", "-n"], capture_output=True, check=True)
proc2 = subprocess.run(
["grep", mac], input=proc1.stdout, capture_output=True, check=True
)
proc3 = subprocess.run(
["awk", "{print $1}"],
input=proc2.stdout,
capture_output=True,
check=True,
)
ip_address = proc3.stdout.decode("utf-8").strip("\n")
except:
ip_address = "N/A"
return ip_address
def get_disk_info(xml) -> typing.Dict[str, str]:
"""returns the disk info"""
result_dict: typing.Dict = {}
disk_types = xml.getElementsByTagName("disk")
for disk_type in disk_types:
disk_nodes = disk_type.childNodes
for disk_node in disk_nodes:
if disk_node.nodeName[0:1] != "#":
for attr in disk_node.attributes.keys():
result_dict[
disk_node.attributes[attr].name
] = disk_node.attributes[attr].value
return result_dict
def ffs(offset, header_list, numbered, *args):
"""A simple columnar printer"""
max_column_width = []
lines = []
numbers_f = []
dummy = []
if numbered:
numbers_f.extend(range(1, len(args[-1]) + 1))
max_column_width.append(
max([len(repr(number)) for number in numbers_f])
)
header_list.insert(0, "idx")
for arg in args:
max_column_width.append(max([len(repr(argette)) for argette in arg]))
index = range(0, len(header_list))
for header, width, i in zip(header_list, max_column_width, index):
max_column_width[i] = max(len(header), width) + offset
for i in index:
dummy.append(
Colors.greenie
+ Colors.BOLD
+ header_list[i].ljust(max_column_width[i])
+ Colors.ENDC
)
lines.append("".join(dummy))
dummy.clear()
index2 = range(0, len(args[-1]))
for i in index2:
if numbered:
dummy.append(
Colors.goo
+ Colors.BOLD
+ repr(i).ljust(max_column_width[0])
+ Colors.ENDC
)
for arg, width in zip(args, max_column_width[1:]):
dummy.append(
Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC
)
else:
for arg, width in zip(args, max_column_width):
dummy.append(
Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC
)
lines.append("".join(dummy))
dummy.clear()
return lines
def size_abr(num: float, shift_by: float) -> str:
num = num * shift_by
if num < 1000:
return repr(num)
elif num < 1_000_000.0:
return repr(num / 1_000) + " KB"
elif num < 1_000_000_000:
return repr(num / 1_000_000) + " MB"
elif num < 1_000_000_000_000:
return repr(num / 1_000_000_000) + " GB"
return "N/A"
def main() -> None:
"""entrypoint"""
# argparser = Argparser()
name = []
cpu_times = []
mem_actual = []
mem_unused = []
write_bytes = []
read_bytes = []
macs = []
ips = []
for hv_host in hv:
conn = libvirt.openReadOnly(hv_host)
active_hosts = conn.listDomainsID()
for host_id in active_hosts:
dom = conn.lookupByID(host_id)
cpu_times.append(
repr(
int(
dom.getCPUStats(total=True)[0]["cpu_time"]
/ 1_000_000_000
)
)
+ " s"
)
raw_xml = dom.XMLDesc()
xml = minidom.parseString(raw_xml)
name.append(dom.name())
mem_stats = dom.memoryStats()
mem_actual.append(size_abr(mem_stats["actual"], 1000))
mem_unused.append(size_abr(mem_stats["unused"], 1000))
tree = ElementTree.fromstring(dom.XMLDesc())
iface = tree.find("devices/interface/target").get("dev")
stats = dom.interfaceStats(iface)
write_bytes.append(size_abr(stats[4], 1))
read_bytes.append(size_abr(stats[0], 1))
get_disk_info(xml)
network_info = get_network_info(xml)
macs.append(network_info["address"])
ips.append(get_ip_by_arp(network_info["address"]))
lines = ffs(
3,
[
"name",
"cpu",
"mem_actual",
"mem_unused",
"write_B",
"read_B",
"MAC",
"IP",
],
True,
name,
cpu_times,
mem_actual,
mem_unused,
write_bytes,
read_bytes,
macs,
ips,
)
for line in lines:
print(line)
# time.sleep(argparser.args.delay)
if __name__ == "__main__":
main()