aboutsummaryrefslogtreecommitdiffstats
path: root/bin/virttop
diff options
context:
space:
mode:
Diffstat (limited to 'bin/virttop')
-rwxr-xr-xbin/virttop376
1 files changed, 0 insertions, 376 deletions
diff --git a/bin/virttop b/bin/virttop
deleted file mode 100755
index 4d8943b..0000000
--- a/bin/virttop
+++ /dev/null
@@ -1,376 +0,0 @@
-#!/usr/bin/env python
-"""virt top"""
-
-# ideally we would like to use the monkeypatch but it is untested
-# and experimental
-# import defusedxml # type:ignore
-# defusedxml.defuse_stdlib()
-import argparse
-import csv
-import dataclasses
-import enum
-import os
-import signal
-import sys
-import time
-import typing
-
-# we are only using this for type annotation
-from xml.dom.minidom import Document # nosec
-
-from defusedxml import ElementTree # type:ignore
-from defusedxml import minidom
-import libvirt # type:ignore
-
-
-# pylint: disable=unused-argument
-def sig_handler_sigint(signum, frame):
- """Just to handle C-c gracefully"""
- print()
- sys.exit(0)
-
-
-class Argparser: # pylint: disable=too-few-public-methods
- """Argparser class."""
-
- def __init__(self):
- self.parser = argparse.ArgumentParser()
- self.parser.add_argument(
- "--delay",
- "-d",
- type=float,
- help="The delay between updates",
- default=5,
- )
- self.parser.add_argument(
- "--uri",
- "-u",
- nargs="+",
- type=str,
- help="A list of URIs to connect to seperated by commas",
- default=["qemu:///system"],
- )
- self.args = self.parser.parse_args()
-
-
-# pylint: disable=too-few-public-methods
-class Colors(enum.EnumType):
- """static color definitions"""
-
- purple = "\033[95m"
- blue = "\033[94m"
- green = "\033[92m"
- yellow = "\033[93m"
- red = "\033[91m"
- grey = "\033[1;37m"
- darkgrey = "\033[1;30m"
- cyan = "\033[1;36m"
- ENDC = "\033[0m"
- BOLD = "\033[1m"
- UNDERLINE = "\033[4m"
- blueblue = "\x1b[38;5;24m"
- greenie = "\x1b[38;5;23m"
- goo = "\x1b[38;5;22m"
- screen_clear = "\033c\033[3J"
- hide_cursor = "\033[?25l"
-
-
-@dataclasses.dataclass
-# pylint: disable=too-many-instance-attributes
-class VirtData:
- """Holds the data that we collect to display to the user"""
-
- vm_id: typing.List[str] = dataclasses.field(default_factory=list)
- name: typing.List[str] = dataclasses.field(default_factory=list)
- cpu_times: typing.List[str] = dataclasses.field(default_factory=list)
- mem_actual: typing.List[str] = dataclasses.field(default_factory=list)
- mem_unused: typing.List[str] = dataclasses.field(default_factory=list)
- write_bytes: typing.List[str] = dataclasses.field(default_factory=list)
- read_bytes: typing.List[str] = dataclasses.field(default_factory=list)
- macs: typing.List[str] = dataclasses.field(default_factory=list)
- ips: typing.List[str] = dataclasses.field(default_factory=list)
- disk_reads: typing.List[str] = dataclasses.field(default_factory=list)
- disk_writes: typing.List[str] = dataclasses.field(default_factory=list)
- snapshot_counts: typing.List[str] = dataclasses.field(default_factory=list)
- uri: typing.List[str] = dataclasses.field(default_factory=list)
- memory_pool: typing.List[str] = dataclasses.field(default_factory=list)
-
- pools: typing.List[libvirt.virStoragePool] = dataclasses.field(
- default_factory=list
- )
-
-
-def get_network_info(
- xml_doc: Document,
-) -> typing.Dict[str, str]:
- """returns the network info"""
- result_dict = {}
- interface_types = xml_doc.getElementsByTagName("interface")
- for interface_type in interface_types:
- interface_nodes = interface_type.childNodes
- for interface_node in interface_nodes:
- if interface_node.nodeName[0:1] != "#":
- for attr in interface_node.attributes.keys():
- result_dict[
- interface_node.attributes[attr].name
- ] = interface_node.attributes[attr].value
- return result_dict
-
-
-def get_arp_table() -> typing.Dict[str, str]:
- """Get the ARP table. return a dict with MAC/IP as key/value pair."""
- result: typing.Dict[str, str] = {}
- with open("/proc/net/arp", encoding="utf-8") as arp_table:
- reader = csv.reader(arp_table, skipinitialspace=True, delimiter=" ")
- for arp_entry in reader:
- result[arp_entry[3]] = arp_entry[0]
-
- return result
-
-
-def get_ip_from_arp_table(arp_table: typing.Dict[str, str], mac: str) -> str:
- """get IP from MAC address using the arp table"""
- try:
- return arp_table[mac]
- except KeyError:
- return "N/A"
-
-
-def get_disk_info(
- xml_doc: Document,
-) -> typing.Dict[str, str]:
- """returns the disk info"""
- result_dict: typing.Dict = {}
- disk_types = xml_doc.getElementsByTagName("disk")
- for disk_type in disk_types:
- disk_nodes = disk_type.childNodes
- for disk_node in disk_nodes:
- if disk_node.nodeName[0:1] != "#":
- for attr in disk_node.attributes.keys():
- result_dict[
- disk_node.attributes[attr].name
- ] = disk_node.attributes[attr].value
-
- return result_dict
-
-
-# pylint: disable=too-many-locals
-def ffs(
- offset: int,
- header_list: typing.Optional[typing.List[str]],
- numbered: bool,
- *args,
-) -> typing.List[str]:
- """A simple columnar printer"""
- max_column_width = []
- lines = []
- numbers_f: typing.List[int] = []
- dummy = []
-
- if sys.stdout.isatty():
- greenie = Colors.greenie
- bold = Colors.BOLD
- endc = Colors.ENDC
- goo = Colors.goo
- blueblue = Colors.blueblue
- else:
- greenie = ""
- bold = ""
- endc = ""
- goo = ""
- blueblue = ""
-
- for arg in args:
- max_column_width.append(max(len(repr(argette)) for argette in arg))
-
- if header_list is not None:
- if numbered:
- numbers_f.extend(range(1, len(args[-1]) + 1))
- max_column_width.append(
- max(len(repr(number)) for number in numbers_f)
- )
- header_list.insert(0, "idx")
-
- index = range(0, len(header_list))
- for header, width, i in zip(header_list, max_column_width, index):
- max_column_width[i] = max(len(header), width) + offset
-
- for i in index:
- dummy.append(
- greenie
- + bold
- + header_list[i].ljust(max_column_width[i])
- + endc
- )
- lines.append("".join(dummy))
- dummy.clear()
-
- index2 = range(0, len(args[-1]))
- for i in index2:
- if numbered:
- dummy.append(
- goo + bold + repr(i).ljust(max_column_width[0]) + endc
- )
- for arg, width in zip(args, max_column_width[1:]):
- dummy.append(blueblue + (arg[i]).ljust(width) + endc)
- else:
- for arg, width in zip(args, max_column_width):
- dummy.append(blueblue + (arg[i]).ljust(width) + endc)
- lines.append("".join(dummy))
- dummy.clear()
- return lines
-
-
-def size_abr(num: float, shift_by: float) -> str:
- """Rounds and abbreviates floats."""
- num = num * shift_by
- if num < 1000:
- return repr(num)
- if num < 1_000_000.0:
- return repr(round(num / 1_000, 2)) + " KB"
- if num < 1_000_000_000:
- return repr(round(num / 1_000_000, 2)) + " MB"
- if num < 1_000_000_000_000:
- return repr(round(num / 1_000_000_000, 2)) + " GB"
- return "N/A"
-
-
-# pylint: disable=too-many-locals
-def fill_virt_data_uri(
- conn: libvirt.virConnect,
- active_hosts: typing.List[int],
- virt_data: VirtData,
- arp_table: typing.Dict[str, str],
-) -> None:
- """fill VirtData for one URI."""
- for host_id in active_hosts:
- virt_data.uri.append(conn.getURI())
- dom = conn.lookupByID(host_id)
- virt_data.snapshot_counts.append(repr(dom.snapshotNum()))
- virt_data.cpu_times.append(
- repr(
- int(dom.getCPUStats(total=True)[0]["cpu_time"] / 1_000_000_000)
- )
- + "s"
- )
- xml_doc = minidom.parseString(dom.XMLDesc())
- virt_data.name.append(dom.name())
-
- mem_stats = dom.memoryStats()
- if "actual" in mem_stats:
- virt_data.mem_actual.append(size_abr(mem_stats["actual"], 1000))
- else:
- virt_data.mem_actual.append("n/a")
-
- # BSD guests dont support mem balloons?
- try:
- virt_data.mem_unused.append(size_abr(mem_stats["available"], 1000))
- except KeyError:
- virt_data.mem_unused.append("N/A")
-
- tree = ElementTree.fromstring(dom.XMLDesc())
- iface = tree.find("devices/interface/target").get("dev")
- stats = dom.interfaceStats(iface)
- virt_data.write_bytes.append(size_abr(stats[4], 1))
- virt_data.read_bytes.append(size_abr(stats[0], 1))
-
- found_the_pool: bool = False
- disk = tree.find("devices/disk/source").get("file")
- for pool in virt_data.pools:
- if os.path.basename(disk) in pool.listVolumes():
- virt_data.memory_pool.append(pool.name())
- found_the_pool = True
- # you could delete the pool but keep the volumes inside
- # which results in a functional VM but it wont have a
- # volume inside a pool that we can detect
- if not found_the_pool:
- virt_data.memory_pool.append("N/A")
-
- disk_info = get_disk_info(xml_doc)
- image_name = disk_info["file"]
- _, rd_bytes, _, wr_bytes, _ = dom.blockStats(image_name)
- virt_data.disk_reads.append(size_abr(rd_bytes, 1))
- virt_data.disk_writes.append(size_abr(wr_bytes, 1))
-
- network_info = get_network_info(xml_doc)
- virt_data.macs.append(network_info["address"])
- # virt_data.ips.append(get_ip_by_arp(network_info["address"]))
- # TODO-this is obviously not going to work for remote URIs
- virt_data.ips.append(
- get_ip_from_arp_table(arp_table, network_info["address"])
- )
-
-
-def main() -> None:
- """entrypoint"""
- signal.signal(signal.SIGINT, sig_handler_sigint)
- argparser = Argparser()
- print(Colors.screen_clear, end="")
- while True:
- virt_data = VirtData()
- arp_table = get_arp_table()
- for hv_host in argparser.args.uri:
- conn = libvirt.openReadOnly(hv_host)
- active_hosts = conn.listDomainsID()
- if len(active_hosts) > 0:
- virt_data.pools = conn.listAllStoragePools()
- # for pool in virt_data.pools:
- # print(pool.listVolumes())
- # networks = conn.listAllNetworks()
- # print([pool.name() for pool in conn.listAllStoragePools()])
- # print([net.name() for net in conn.listAllNetworks()])
- virt_data.vm_id = [
- repr(vm_id) for vm_id in conn.listDomainsID()
- ]
- fill_virt_data_uri(conn, active_hosts, virt_data, arp_table)
- # for conn_id in conn.listAllDomains():
- # print(conn_id.name())
- # print(conn_id.state())
- else:
- print("no active VMs found.")
- sys.exit(1)
-
- lines = ffs(
- 2,
- [
- "ID",
- "NAME",
- "CPU",
- "MEM_ACTUAL",
- "MEM_AVAIL",
- "NET_WRITE_B",
- "NET_READ_B",
- "MAC",
- "IP",
- "IO_READ_B",
- "IO_WRITE_B",
- "SNAPSHOTS",
- "URI",
- "STORAGE_POOL",
- ],
- False,
- virt_data.vm_id,
- virt_data.name,
- virt_data.cpu_times,
- virt_data.mem_actual,
- virt_data.mem_unused,
- virt_data.write_bytes,
- virt_data.read_bytes,
- virt_data.macs,
- virt_data.ips,
- virt_data.disk_reads,
- virt_data.disk_writes,
- virt_data.snapshot_counts,
- virt_data.uri,
- virt_data.memory_pool,
- )
- for line in lines:
- print(line)
- time.sleep(argparser.args.delay)
- # clears the screen
- print(Colors.screen_clear, end="")
- print(Colors.hide_cursor, end="")
-
-
-if __name__ == "__main__":
- main()