aboutsummaryrefslogtreecommitdiffstats
path: root/bin/virttop
diff options
context:
space:
mode:
authorterminaldweller <thabogre@gmail.com>2022-12-26 08:25:09 +0000
committerterminaldweller <thabogre@gmail.com>2022-12-26 08:25:09 +0000
commite0a9a4c74fd973291c5b726ae969714b86491c3f (patch)
tree106afd9dfcce216651efdbb0f6a5319af18efc96 /bin/virttop
parentupdate (diff)
downloadscripts-e0a9a4c74fd973291c5b726ae969714b86491c3f.tar.gz
scripts-e0a9a4c74fd973291c5b726ae969714b86491c3f.zip
update
Diffstat (limited to '')
-rwxr-xr-xbin/virttop221
1 files changed, 133 insertions, 88 deletions
diff --git a/bin/virttop b/bin/virttop
index 4310d5d..ef62c20 100755
--- a/bin/virttop
+++ b/bin/virttop
@@ -6,15 +6,16 @@
# import defusedxml # type:ignore
# defusedxml.defuse_stdlib()
import argparse
+import dataclasses
import subprocess
-import time
+
+# import time
import typing
+from xml.dom.minidom import Document
from defusedxml import ElementTree # type:ignore
from defusedxml import minidom
import libvirt # type:ignore
-hv = ["qemu:///system"]
-
class Argparser: # pylint: disable=too-few-public-methods
"""Argparser class."""
@@ -25,9 +26,17 @@ class Argparser: # pylint: disable=too-few-public-methods
"--delay",
"-d",
type=float,
- help="the delay between updates",
+ help="The delay between updates",
default=5,
)
+ self.parser.add_argument(
+ "--uri",
+ "-u",
+ nargs="+",
+ type=str,
+ help="A list of URIs to connect to seperated by commas",
+ default=["qemu:///system"],
+ )
self.args = self.parser.parse_args()
@@ -51,10 +60,29 @@ class Colors:
goo = "\x1b[38;5;22m"
-def get_network_info(xml) -> typing.Dict[str, str]:
+@dataclasses.dataclass
+# pylint: disable=too-many-instance-attributes
+class VirtData:
+ """Holds the data that we collect to display to the user"""
+
+ name: typing.List[str] = dataclasses.field(default_factory=list)
+ cpu_times: typing.List[str] = dataclasses.field(default_factory=list)
+ mem_actual: typing.List[str] = dataclasses.field(default_factory=list)
+ mem_unused: typing.List[str] = dataclasses.field(default_factory=list)
+ write_bytes: typing.List[str] = dataclasses.field(default_factory=list)
+ read_bytes: typing.List[str] = dataclasses.field(default_factory=list)
+ macs: typing.List[str] = dataclasses.field(default_factory=list)
+ ips: typing.List[str] = dataclasses.field(default_factory=list)
+ disk_reads: typing.List[str] = dataclasses.field(default_factory=list)
+ disk_writes: typing.List[str] = dataclasses.field(default_factory=list)
+
+
+def get_network_info(
+ xml_doc: Document,
+) -> typing.Dict[str, str]:
"""returns the network info"""
result_dict = {}
- interface_types = xml.getElementsByTagName("interface")
+ interface_types = xml_doc.getElementsByTagName("interface")
for interface_type in interface_types:
interface_nodes = interface_type.childNodes
for interface_node in interface_nodes:
@@ -67,29 +95,41 @@ def get_network_info(xml) -> typing.Dict[str, str]:
def get_ip_by_arp(mac: str) -> str:
- """get ip adress using arp"""
+ """Get ip adress using the arp table."""
# arp -n | grep xx:xx:xx:xx:xx:xx | gawk '{print $1}'
try:
- proc1 = subprocess.run(["arp", "-n"], capture_output=True, check=True)
+ proc1 = subprocess.run(
+ ["/usr/bin/arp", "-n"],
+ capture_output=True,
+ check=True,
+ shell=False,
+ )
proc2 = subprocess.run(
- ["grep", mac], input=proc1.stdout, capture_output=True, check=True
+ ["/usr/bin/grep", mac],
+ input=proc1.stdout,
+ capture_output=True,
+ check=True,
+ shell=False,
)
proc3 = subprocess.run(
- ["awk", "{print $1}"],
+ ["/usr/bin/awk", "{print $1}"],
input=proc2.stdout,
capture_output=True,
check=True,
+ shell=False,
)
ip_address = proc3.stdout.decode("utf-8").strip("\n")
- except:
+ except subprocess.CalledProcessError:
ip_address = "N/A"
return ip_address
-def get_disk_info(xml) -> typing.Dict[str, str]:
+def get_disk_info(
+ xml_doc: Document,
+) -> typing.Dict[str, str]:
"""returns the disk info"""
result_dict: typing.Dict = {}
- disk_types = xml.getElementsByTagName("disk")
+ disk_types = xml_doc.getElementsByTagName("disk")
for disk_type in disk_types:
disk_nodes = disk_type.childNodes
for disk_node in disk_nodes:
@@ -102,22 +142,20 @@ def get_disk_info(xml) -> typing.Dict[str, str]:
return result_dict
-def ffs(offset, header_list, numbered, *args):
+def ffs(offset: int, header_list: typing.List[str], numbered: bool, *args):
"""A simple columnar printer"""
max_column_width = []
lines = []
- numbers_f = []
+ numbers_f: typing.List[int] = []
dummy = []
if numbered:
numbers_f.extend(range(1, len(args[-1]) + 1))
- max_column_width.append(
- max([len(repr(number)) for number in numbers_f])
- )
+ max_column_width.append(max(len(repr(number)) for number in numbers_f))
header_list.insert(0, "idx")
for arg in args:
- max_column_width.append(max([len(repr(argette)) for argette in arg]))
+ max_column_width.append(max(len(repr(argette)) for argette in arg))
index = range(0, len(header_list))
for header, width, i in zip(header_list, max_column_width, index):
@@ -144,12 +182,12 @@ def ffs(offset, header_list, numbered, *args):
)
for arg, width in zip(args, max_column_width[1:]):
dummy.append(
- Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC
+ Colors.blueblue + (arg[i]).ljust(width) + Colors.ENDC
)
else:
for arg, width in zip(args, max_column_width):
dummy.append(
- Colors.blueblue + repr(arg[i]).ljust(width) + Colors.ENDC
+ Colors.blueblue + (arg[i]).ljust(width) + Colors.ENDC
)
lines.append("".join(dummy))
dummy.clear()
@@ -157,83 +195,90 @@ def ffs(offset, header_list, numbered, *args):
def size_abr(num: float, shift_by: float) -> str:
+ """Rounds and abbreviates floats."""
num = num * shift_by
if num < 1000:
return repr(num)
- elif num < 1_000_000.0:
- return repr(num / 1_000) + " KB"
- elif num < 1_000_000_000:
- return repr(num / 1_000_000) + " MB"
- elif num < 1_000_000_000_000:
- return repr(num / 1_000_000_000) + " GB"
+ if num < 1_000_000.0:
+ return repr(round(num / 1_000, 2)) + " KB"
+ if num < 1_000_000_000:
+ return repr(round(num / 1_000_000, 2)) + " MB"
+ if num < 1_000_000_000_000:
+ return repr(round(num / 1_000_000_000, 2)) + " GB"
return "N/A"
+def fill_virt_data_uri(conn, active_hosts, virt_data: VirtData) -> None:
+ """fill VirtData for one URI."""
+ for host_id in active_hosts:
+ dom = conn.lookupByID(host_id)
+ virt_data.cpu_times.append(
+ repr(
+ int(dom.getCPUStats(total=True)[0]["cpu_time"] / 1_000_000_000)
+ )
+ + "s"
+ )
+ xml_doc = minidom.parseString(dom.XMLDesc())
+ virt_data.name.append(dom.name())
+
+ mem_stats = dom.memoryStats()
+ virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000))
+ virt_data.mem_unused.append(size_abr(mem_stats["unused"], 1000))
+
+ tree = ElementTree.fromstring(dom.XMLDesc())
+ iface = tree.find("devices/interface/target").get("dev")
+ stats = dom.interfaceStats(iface)
+ virt_data.write_bytes.append(size_abr(stats[4], 1))
+ virt_data.read_bytes.append(size_abr(stats[0], 1))
+
+ disk_info = get_disk_info(xml_doc)
+ image_name = disk_info["file"]
+ _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name)
+ virt_data.disk_reads.append(size_abr(rd_bytes, 1))
+ virt_data.disk_writes.append(size_abr(wr_bytes, 1))
+
+ network_info = get_network_info(xml_doc)
+ virt_data.macs.append(network_info["address"])
+ virt_data.ips.append(get_ip_by_arp(network_info["address"]))
+
+
def main() -> None:
"""entrypoint"""
- # argparser = Argparser()
- name = []
- cpu_times = []
- mem_actual = []
- mem_unused = []
- write_bytes = []
- read_bytes = []
- macs = []
- ips = []
- for hv_host in hv:
+ argparser = Argparser()
+ virt_data = VirtData()
+ for hv_host in argparser.args.uri:
conn = libvirt.openReadOnly(hv_host)
active_hosts = conn.listDomainsID()
- for host_id in active_hosts:
- dom = conn.lookupByID(host_id)
- cpu_times.append(
- repr(
- int(
- dom.getCPUStats(total=True)[0]["cpu_time"]
- / 1_000_000_000
- )
- )
- + " s"
- )
- raw_xml = dom.XMLDesc()
- xml = minidom.parseString(raw_xml)
- name.append(dom.name())
- mem_stats = dom.memoryStats()
- mem_actual.append(size_abr(mem_stats["actual"], 1000))
- mem_unused.append(size_abr(mem_stats["unused"], 1000))
- tree = ElementTree.fromstring(dom.XMLDesc())
- iface = tree.find("devices/interface/target").get("dev")
- stats = dom.interfaceStats(iface)
- write_bytes.append(size_abr(stats[4], 1))
- read_bytes.append(size_abr(stats[0], 1))
- get_disk_info(xml)
- network_info = get_network_info(xml)
- macs.append(network_info["address"])
- ips.append(get_ip_by_arp(network_info["address"]))
-
- lines = ffs(
- 3,
- [
- "name",
- "cpu",
- "mem_actual",
- "mem_unused",
- "write_B",
- "read_B",
- "MAC",
- "IP",
- ],
- True,
- name,
- cpu_times,
- mem_actual,
- mem_unused,
- write_bytes,
- read_bytes,
- macs,
- ips,
- )
- for line in lines:
- print(line)
+ fill_virt_data_uri(conn, active_hosts, virt_data)
+
+ lines = ffs(
+ 3,
+ [
+ "name",
+ "cpu",
+ "mem_actual",
+ "mem_unused",
+ "net_write_B",
+ "net_read_B",
+ "MAC",
+ "IP",
+ "IO_read_B",
+ "IO_write_B",
+ ],
+ True,
+ virt_data.name,
+ virt_data.cpu_times,
+ virt_data.mem_actual,
+ virt_data.mem_unused,
+ virt_data.write_bytes,
+ virt_data.read_bytes,
+ virt_data.macs,
+ virt_data.ips,
+ virt_data.disk_reads,
+ virt_data.disk_writes,
+ )
+ for line in lines:
+ print(line)
# time.sleep(argparser.args.delay)