diff options
author | terminaldweller <thabogre@gmail.com> | 2023-01-16 12:04:12 +0000 |
---|---|---|
committer | terminaldweller <thabogre@gmail.com> | 2023-01-16 12:04:12 +0000 |
commit | 437fc047e789340ace274159bf2046382b069140 (patch) | |
tree | 2434388b565bdb76b247cc7fa081ced4ebc8b8a6 /bin/tunneltop | |
parent | update (diff) | |
download | scripts-437fc047e789340ace274159bf2046382b069140.tar.gz scripts-437fc047e789340ace274159bf2046382b069140.zip |
update
Diffstat (limited to 'bin/tunneltop')
-rwxr-xr-x | bin/tunneltop | 230 |
1 files changed, 230 insertions, 0 deletions
diff --git a/bin/tunneltop b/bin/tunneltop new file mode 100755 index 0000000..ebed5d9 --- /dev/null +++ b/bin/tunneltop @@ -0,0 +1,230 @@ +#!/usr/bin/env python +"""A top-like program for monitoring ssh tunnels""" + +import argparse +import asyncio +import enum +import sys +import typing + +import tomllib + + +class Argparser: # pylint: disable=too-few-public-methods + """Argparser class.""" + + def __init__(self): + self.parser = argparse.ArgumentParser() + self.parser.add_argument( + "--config", + "-c", + type=str, + help="the path to the .tunneltop.toml file", + default="/home/devi/.tunneltop.toml", + ) + self.parser.add_argument( + "--noheader", + "-n", + type=bool, + help="dont print the header", + default=False, + ) + self.parser.add_argument( + "--delay", + "-d", + type=float, + help="The delay between updates in seconds", + default=5, + ) + self.args = self.parser.parse_args() + + +# pylint: disable=too-few-public-methods +class Colors(enum.EnumType): + """static color definitions""" + + purple = "\033[95m" + blue = "\033[94m" + green = "\033[92m" + yellow = "\033[93m" + red = "\033[91m" + grey = "\033[1;37m" + darkgrey = "\033[1;30m" + cyan = "\033[1;36m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + blueblue = "\x1b[38;5;24m" + greenie = "\x1b[38;5;23m" + goo = "\x1b[38;5;22m" + screen_clear = "\033c\033[3J" + hide_cursor = "\033[?25l" + + +# pylint: disable=too-many-locals +def ffs( + offset: int, + header_list: typing.Optional[typing.List[str]], + numbered: bool, + *args, +) -> typing.List[str]: + """A simple columnar printer""" + max_column_width = [] + lines = [] + numbers_f: typing.List[int] = [] + dummy = [] + + if sys.stdout.isatty(): + greenie = Colors.greenie + bold = Colors.BOLD + endc = Colors.ENDC + goo = Colors.goo + blueblue = Colors.blueblue + else: + greenie = "" + bold = "" + endc = "" + goo = "" + blueblue = "" + + for arg in args: + max_column_width.append(max(len(repr(argette)) for argette in arg)) + + if header_list is not None: + if numbered: + numbers_f.extend(range(1, len(args[-1]) + 1)) + max_column_width.append( + max(len(repr(number)) for number in numbers_f) + ) + header_list.insert(0, "idx") + + index = range(0, len(header_list)) + for header, width, i in zip(header_list, max_column_width, index): + max_column_width[i] = max(len(header), width) + offset + + for i in index: + dummy.append( + greenie + + bold + + header_list[i].ljust(max_column_width[i]) + + endc + ) + lines.append("".join(dummy)) + dummy.clear() + + index2 = range(0, len(args[-1])) + for i in index2: + if numbered: + dummy.append( + goo + bold + repr(i).ljust(max_column_width[0]) + endc + ) + for arg, width in zip(args, max_column_width[1:]): + dummy.append(blueblue + (arg[i]).ljust(width) + endc) + else: + for arg, width in zip(args, max_column_width): + dummy.append(blueblue + (arg[i]).ljust(width) + endc) + lines.append("".join(dummy)) + dummy.clear() + return lines + + +async def run_subshell(cmd: str) -> typing.Tuple[bytes, bytes]: + """Run a command in a subshell""" + proc = await asyncio.create_subprocess_shell( + cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + + # return stdout and stderr + return await proc.communicate() + + +def tunnel_test_callback(task: asyncio.Task) -> None: + """Tunnel test callback function.""" + task_name = task.get_name() + # data_cols["stdout"] = task.result()[0] + # data_cols["stderr"] = task.result()[1] + if ( + task.result()[0].decode("utf-8").strip("\n") + == data_cols[task_name]["test_command_result"] + ): + data_cols[task_name]["status"] = "UP" + else: + data_cols[task_name]["status"] = "DOWN" + + +async def tunnel_test_procs() -> typing.List[asyncio.Task]: + """run all the tunnel tests in the background as separate tasks""" + tasks: typing.List[asyncio.Task] = [] + for _, value in data_cols.items(): + if value["test_command"] != "": + tasks.append( + asyncio.create_task( + run_subshell(value["test_command"]), name=value["name"] + ) + ) + tasks[-1].add_done_callback(tunnel_test_callback) + await asyncio.sleep(0) + + return tasks + + +async def tunnel_procs(commands: typing.List[str]) -> None: + """run all the tunnels in the background as separate tasks""" + for command in commands: + asyncio.create_task(run_subshell(command)) + await asyncio.sleep(0) + + +data_cols: typing.Dict[str, typing.Dict] = {} + + +async def main() -> None: + """entrypoint""" + argparser = Argparser() + print(Colors.screen_clear, end="") + print(Colors.hide_cursor, end="") + + with open(argparser.args.config, "rb") as conf_file: + data = tomllib.load(conf_file) + for key, value in data.items(): + data_cols[key] = { + "name": key, + "address": value["address"], + "port": value["port"], + "command": value["command"], + "status": "UNKN", + "test_command": value["test_command"], + "test_command_result": value["test_command_result"], + "test_interval": value["test_interval"], + "test_timeout": value["test_timeout"], + "stdout": "", + "stderr": "", + } + + await tunnel_procs([v["command"] for _, v in data_cols.items()]) + + while True: + await tunnel_test_procs() + lines = ffs( + 2, + ["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"] + if not argparser.args.noheader + else None, + False, + [v["name"] for _, v in data_cols.items()], + [v["address"] for _, v in data_cols.items()], + [repr(v["port"]) for _, v in data_cols.items()], + [v["status"] for _, v in data_cols.items()], + [v["stdout"] for _, v in data_cols.items()], + [v["stderr"] for _, v in data_cols.items()], + ) + for line in lines: + print(line) + + await asyncio.sleep(argparser.args.delay) + print(Colors.screen_clear, end="") + print(Colors.hide_cursor, end="") + + +if __name__ == "__main__": + asyncio.run(main()) |