#!/usr/bin/env python
"""A top-like program for monitoring ssh tunnels"""
import argparse
import asyncio
import enum
import sys
import typing
import tomllib
class Argparser: # pylint: disable=too-few-public-methods
"""Argparser class."""
def __init__(self):
self.parser = argparse.ArgumentParser()
self.parser.add_argument(
"--config",
"-c",
type=str,
help="the path to the .tunneltop.toml file",
default="/home/devi/.tunneltop.toml",
)
self.parser.add_argument(
"--noheader",
"-n",
type=bool,
help="dont print the header",
default=False,
)
self.parser.add_argument(
"--delay",
"-d",
type=float,
help="The delay between updates in seconds",
default=5,
)
self.args = self.parser.parse_args()
# pylint: disable=too-few-public-methods
class Colors(enum.EnumType):
"""static color definitions"""
purple = "\033[95m"
blue = "\033[94m"
green = "\033[92m"
yellow = "\033[93m"
red = "\033[91m"
grey = "\033[1;37m"
darkgrey = "\033[1;30m"
cyan = "\033[1;36m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
blueblue = "\x1b[38;5;24m"
greenie = "\x1b[38;5;23m"
goo = "\x1b[38;5;22m"
screen_clear = "\033c\033[3J"
hide_cursor = "\033[?25l"
# pylint: disable=too-many-locals
def ffs(
offset: int,
header_list: typing.Optional[typing.List[str]],
numbered: bool,
*args,
) -> typing.List[str]:
"""A simple columnar printer"""
max_column_width = []
lines = []
numbers_f: typing.List[int] = []
dummy = []
if sys.stdout.isatty():
greenie = Colors.greenie
bold = Colors.BOLD
endc = Colors.ENDC
goo = Colors.goo
blueblue = Colors.blueblue
else:
greenie = ""
bold = ""
endc = ""
goo = ""
blueblue = ""
for arg in args:
max_column_width.append(max(len(repr(argette)) for argette in arg))
if header_list is not None:
if numbered:
numbers_f.extend(range(1, len(args[-1]) + 1))
max_column_width.append(
max(len(repr(number)) for number in numbers_f)
)
header_list.insert(0, "idx")
index = range(0, len(header_list))
for header, width, i in zip(header_list, max_column_width, index):
max_column_width[i] = max(len(header), width) + offset
for i in index:
dummy.append(
greenie
+ bold
+ header_list[i].ljust(max_column_width[i])
+ endc
)
lines.append("".join(dummy))
dummy.clear()
index2 = range(0, len(args[-1]))
for i in index2:
if numbered:
dummy.append(
goo + bold + repr(i).ljust(max_column_width[0]) + endc
)
for arg, width in zip(args, max_column_width[1:]):
dummy.append(blueblue + (arg[i]).ljust(width) + endc)
else:
for arg, width in zip(args, max_column_width):
dummy.append(blueblue + (arg[i]).ljust(width) + endc)
lines.append("".join(dummy))
dummy.clear()
return lines
async def run_subshell(cmd: str) -> typing.Tuple[bytes, bytes]:
"""Run a command in a subshell"""
proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
# return stdout and stderr
return await proc.communicate()
def tunnel_test_callback(task: asyncio.Task) -> None:
"""Tunnel test callback function."""
task_name = task.get_name()
# data_cols["stdout"] = task.result()[0]
# data_cols["stderr"] = task.result()[1]
if (
task.result()[0].decode("utf-8").strip("\n")
== data_cols[task_name]["test_command_result"]
):
data_cols[task_name]["status"] = "UP"
else:
data_cols[task_name]["status"] = "DOWN"
async def tunnel_test_procs() -> typing.List[asyncio.Task]:
"""run all the tunnel tests in the background as separate tasks"""
tasks: typing.List[asyncio.Task] = []
for _, value in data_cols.items():
if value["test_command"] != "":
tasks.append(
asyncio.create_task(
run_subshell(value["test_command"]), name=value["name"]
)
)
tasks[-1].add_done_callback(tunnel_test_callback)
await asyncio.sleep(0)
return tasks
async def tunnel_procs(commands: typing.List[str]) -> None:
"""run all the tunnels in the background as separate tasks"""
for command in commands:
asyncio.create_task(run_subshell(command))
await asyncio.sleep(0)
data_cols: typing.Dict[str, typing.Dict] = {}
async def main() -> None:
"""entrypoint"""
argparser = Argparser()
print(Colors.screen_clear, end="")
print(Colors.hide_cursor, end="")
with open(argparser.args.config, "rb") as conf_file:
data = tomllib.load(conf_file)
for key, value in data.items():
data_cols[key] = {
"name": key,
"address": value["address"],
"port": value["port"],
"command": value["command"],
"status": "UNKN",
"test_command": value["test_command"],
"test_command_result": value["test_command_result"],
"test_interval": value["test_interval"],
"test_timeout": value["test_timeout"],
"stdout": "",
"stderr": "",
}
await tunnel_procs([v["command"] for _, v in data_cols.items()])
while True:
await tunnel_test_procs()
lines = ffs(
2,
["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"]
if not argparser.args.noheader
else None,
False,
[v["name"] for _, v in data_cols.items()],
[v["address"] for _, v in data_cols.items()],
[repr(v["port"]) for _, v in data_cols.items()],
[v["status"] for _, v in data_cols.items()],
[v["stdout"] for _, v in data_cols.items()],
[v["stderr"] for _, v in data_cols.items()],
)
for line in lines:
print(line)
await asyncio.sleep(argparser.args.delay)
print(Colors.screen_clear, end="")
print(Colors.hide_cursor, end="")
if __name__ == "__main__":
asyncio.run(main())