From 7cee58615f3a1bdd08273dbf2fc678414bd7b264 Mon Sep 17 00:00:00 2001 From: terminaldweller Date: Mon, 30 Jan 2023 17:03:51 +0330 Subject: initial commit --- pyproject.toml | 7 +- tunneltop | 555 -------------------------------------------------------- tunneltop.py | 558 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 561 insertions(+), 559 deletions(-) delete mode 100755 tunneltop create mode 100755 tunneltop.py diff --git a/pyproject.toml b/pyproject.toml index d571a99..851b3cb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,13 +18,12 @@ packages = [ {include = "tunneltop"} ] -[tool.poetry.dependencies] -python = "^3.11" - - [tool.poetry.scripts] tunneltop = "tunneltop:main" +[tool.poetry.dependencies] +python = "^3.11" + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" diff --git a/tunneltop b/tunneltop deleted file mode 100755 index e1b32c1..0000000 --- a/tunneltop +++ /dev/null @@ -1,555 +0,0 @@ -#!/usr/bin/env python -"""A top-like program for monitoring ssh tunnels""" -# TODO- the disabled coloring is not working -# TODO- quit doesnt work -import argparse -import asyncio -import copy -import curses -import enum -import os -import signal -import sys -import tomllib -import typing - - -class Argparser: # pylint: disable=too-few-public-methods - """Argparser class.""" - - def __init__(self): - self.parser = argparse.ArgumentParser() - self.parser.add_argument( - "--config", - "-c", - type=str, - help="The path to the .tunneltop.toml file," - " defaults to $HOME/.tunneltop.toml", - default="~/.tunneltop.toml", - ) - self.parser.add_argument( - "--noheader", - "-n", - type=bool, - help="Dont print the header in the output", - default=False, - ) - self.parser.add_argument( - "--delay", - "-d", - type=float, - help="The delay between redraws in seconds, defaults to 5 seconds", - default=5, - ) - self.args = self.parser.parse_args() - - -# pylint: disable=too-few-public-methods -class Colors(enum.EnumType): - """static color definitions""" - - purple = "\033[95m" - blue = "\033[94m" - green = "\033[92m" - yellow = "\033[93m" - red = "\033[91m" - grey = "\033[1;37m" - darkgrey = "\033[1;30m" - cyan = "\033[1;36m" - ENDC = "\033[0m" - BOLD = "\033[1m" - UNDERLINE = "\033[4m" - blueblue = "\x1b[38;5;24m" - greenie = "\x1b[38;5;23m" - goo = "\x1b[38;5;22m" - screen_clear = "\033c\033[3J" - hide_cursor = "\033[?25l" - - -# pylint: disable=too-many-locals -def ffs( - offset: int, - header_list: typing.Optional[typing.List[str]], - numbered: bool, - no_color: bool, - *args, -) -> typing.List[str]: - """A simple columnar printer""" - max_column_width = [] - lines = [] - numbers_f: typing.List[int] = [] - dummy = [] - - if no_color or not sys.stdout.isatty(): - greenie = "" - bold = "" - endc = "" - goo = "" - blueblue = "" - else: - greenie = Colors.greenie - bold = Colors.BOLD - endc = Colors.ENDC - goo = Colors.goo - blueblue = Colors.blueblue - - for arg in args: - max_column_width.append(max(len(repr(argette)) for argette in arg)) - - if header_list is not None: - if numbered: - numbers_f.extend(range(1, len(args[-1]) + 1)) - max_column_width.append( - max(len(repr(number)) for number in numbers_f) - ) - header_list.insert(0, "idx") - - index = range(0, len(header_list)) - for header, width, i in zip(header_list, max_column_width, index): - max_column_width[i] = max(len(header), width) + offset - - for i in index: - dummy.append( - greenie - + bold - + header_list[i].ljust(max_column_width[i]) - + endc - ) - lines.append("".join(dummy)) - dummy.clear() - - index2 = range(0, len(args[-1])) - for i in index2: - if numbered: - dummy.append( - goo + bold + repr(i).ljust(max_column_width[0]) + endc - ) - for arg, width in zip(args, max_column_width[1:]): - dummy.append(blueblue + (arg[i]).ljust(width) + endc) - else: - for arg, width in zip(args, max_column_width): - dummy.append(blueblue + (arg[i]).ljust(width) + endc) - lines.append("".join(dummy)) - dummy.clear() - return lines - - -def render( - data_cols: typing.Dict[str, typing.Dict[str, str]], - tasks: typing.List[asyncio.Task], - stdscr, - sel: int, -): - """Render the text""" - lines = ffs( - 2, - ["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"], - False, - True, - [v["name"] for _, v in data_cols.items()], - [v["address"] for _, v in data_cols.items()], - [repr(v["port"]) for _, v in data_cols.items()], - [v["status"] for _, v in data_cols.items()], - [v["stdout"] for _, v in data_cols.items()], - [v["stderr"] for _, v in data_cols.items()], - ) - iterator = iter(lines) - stdscr.addstr(1, 1, lines[0], curses.color_pair(3)) - next(iterator) - for i, line in enumerate(iterator): - try: - line_content = stdscr.instr(sel + 2, 1).decode("utf-8") - name: str = line_content[: line_content.find(" ")] - finally: - name = "" - if i == sel: - stdscr.addstr( - (2 + i) % (len(lines) + 1), - 1, - line, - curses.color_pair(2) - if name not in tasks - else curses.color_pair(5), - ) - else: - stdscr.addstr( - 2 + i, - 1, - line, - curses.color_pair(1) - if name not in tasks - else curses.color_pair(4), - ) - stdscr.addstr("\n") - stdscr.box() - - -def curses_init(): - """Initialize ncurses""" - stdscr = curses.initscr() - curses.start_color() - curses.use_default_colors() - curses.curs_set(False) - curses.noecho() - curses.cbreak() - stdscr.keypad(True) - curses.halfdelay(20) - curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) - curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_GREEN) - curses.init_pair(3, curses.COLOR_BLUE, curses.COLOR_BLACK) - curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK) - curses.init_pair(5, curses.COLOR_BLACK, curses.COLOR_CYAN) - - return stdscr - - -class TunnelManager: - """The tunnel top class""" - - def __init__(self): - self.argparser = Argparser() - self.data_cols: typing.Dict[ - str, typing.Dict[str, str] - ] = self.read_conf() - self.tunnel_tasks: typing.List[asyncio.Task] = [] - self.tunnel_test_tasks: typing.List[asyncio.Task] = [] - self.scheduler_task: asyncio.Task - self.scheduler_table: typing.Dict[ - str, int - ] = self.init_scheduler_table() - # we use this when its time to quit. this will prevent any - # new tasks from being scheduled - self.are_we_dying: bool = False - - def init_scheduler_table(self) -> typing.Dict[str, int]: - """initialize the scheduler table""" - result: typing.Dict[str, int] = {} - for key, value in self.data_cols.items(): - if "test_interval" in value and value["test_command"] != "": - result[key] = 0 - - return result - - async def stop_task( - self, - delete_task: asyncio.Task, - task_list: typing.List[asyncio.Task], - delete: bool = True, - ): - """Remove the reference""" - delete_index: int = -1 - delete_task.cancel() - self.write_log(f"{delete_task.get_name()} is being cancelled\n") - await asyncio.sleep(0) - for i, task in enumerate(task_list): - if task.get_name() == delete_task.get_name(): - delete_index = i - break - - if delete and delete_index >= 0: - task_list.remove(self.tunnel_tasks[delete_index]) - - def read_conf(self) -> typing.Dict[str, typing.Dict[str, str]]: - """Read the config file""" - data_cols: typing.Dict[str, typing.Dict[str, str]] = {} - with open( - os.path.expanduser(self.argparser.args.config), "rb" - ) as conf_file: - data = tomllib.load(conf_file) - for key, value in data.items(): - data_cols[key] = { - "name": key, - "address": value["address"], - "port": value["port"], - "command": value["command"], - "status": "UNKN", - "test_command": value["test_command"], - "test_command_result": value["test_command_result"], - "test_interval": value["test_interval"], - "test_timeout": value["test_timeout"], - "stdout": "", - "stderr": "", - } - return data_cols - - async def run_subprocess(self, cmd: str) -> typing.Tuple[bytes, bytes]: - """Run a command""" - try: - proc = await asyncio.create_subprocess_exec( - *cmd.split(" "), - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - return await proc.communicate() - except asyncio.TimeoutError: - proc.terminate() - return (bytes(), bytes()) - except asyncio.CancelledError: - proc.terminate() - raise - - async def run_test_coro( - self, cmd: str, task_name: str - ) -> typing.Tuple[bytes, bytes]: - """Run a test command""" - try: - stdout, stderr = await self.run_subprocess(cmd) - stdout_str: str = stdout.decode("utf-8").strip("\n").strip('"') - stderr_str: str = stderr.decode("utf-8").strip("\n").strip('"') - - self.data_cols[task_name]["stdout"] = stdout_str - self.data_cols[task_name]["stderr"] = stderr_str - if stdout_str == self.data_cols[task_name]["test_command_result"]: - self.data_cols[task_name]["status"] = "UP" - else: - self.data_cols[task_name]["status"] = "DOWN" - - return stdout, stderr - except asyncio.TimeoutError: - self.data_cols[task_name]["status"] = "TMOUT" - raise - - async def tunnel_procs( - self, - ) -> typing.List[asyncio.Task]: - """run all the tunnels in the background as separate tasks""" - tasks: typing.List[asyncio.Task] = [] - for _, value in self.data_cols.items(): - tasks.append( - asyncio.create_task( - self.run_subprocess(value["command"]), name=value["name"] - ), - ) - await asyncio.sleep(0) - - return tasks - - async def sighup_handler_async_worker(self, data_cols_new) -> None: - """Handles the actual updating of tasks when we get SIGTERM""" - delete_task: typing.Optional[asyncio.Task] = None - for k, value in data_cols_new.items(): - if k not in self.data_cols: - self.tunnel_tasks.append( - asyncio.create_task( - self.run_subprocess(value["command"]), name=k - ) - ) - await asyncio.sleep(0) - self.data_cols[k] = copy.deepcopy(value) - if k in self.scheduler_table: - self.scheduler_table[k] = 0 - else: - if ( - self.data_cols[k]["command"] != data_cols_new[k]["command"] - or self.data_cols[k]["port"] != data_cols_new[k]["port"] - or self.data_cols[k]["address"] - != data_cols_new[k]["address"] - ): - for task in self.tunnel_tasks: - if task.get_name() == k: - delete_task = task - break - # task.cancel() - # await asyncio.sleep(0) - - if delete_task is not None: - await self.stop_task(delete_task, self.tunnel_tasks) - delete_task = None - self.data_cols[k] = copy.deepcopy(data_cols_new[k]) - self.tunnel_tasks.append( - asyncio.create_task( - self.run_subprocess(value["command"]), name=k - ) - ) - if k in self.scheduler_table: - self.scheduler_table[k] = 0 - await asyncio.sleep(0) - - for k, _ in self.data_cols.items(): - if k not in data_cols_new: - for task in self.tunnel_tasks: - if task.get_name() == k: - # task.cancel() - # await asyncio.sleep(0) - delete_task = task - break - if delete_task is not None: - await self.stop_task(delete_task, self.tunnel_tasks) - delete_task = None - del self.data_cols[k] - if k in self.scheduler_table: - del self.scheduler_table[k] - - async def sighup_handler(self) -> None: - """SIGHUP handler. we want to reload the config.""" - # type: ignore # pylint: disable=E0203 - data_cols_new: typing.Dict[str, typing.Dict[str, str]] = {} - data_cols_new = self.read_conf() - await self.sighup_handler_async_worker(data_cols_new) - - def write_log(self, log: str): - """A simple logger""" - with open( - "/home/devi/devi/abbatoir/hole15/log", - "a", - encoding="utf-8", - ) as logfile: - logfile.write(log) - - async def restart_task(self, line_content: str) -> None: - """restart a task""" - name: str = line_content[: line_content.find(" ")] - # was_cancelled: bool = False - for task in self.tunnel_tasks: - if task.get_name() == name: - # was_cancelled = task.cancel() - # self.write_log(f"was_cancelled: {was_cancelled}") - await self.stop_task(task, self.tunnel_tasks) - # await task - # await asyncio.sleep(0) - for _, value in self.data_cols.items(): - if value["name"] == name and task.cancelled(): - self.tunnel_tasks.append( - asyncio.create_task( - self.run_subprocess(value["command"]), - name=value["name"], - ) - ) - await asyncio.sleep(0) - - async def flip_task(self, line_content: str) -> None: - """flip a task""" - name: str = line_content[: line_content.find(" ")] - # was_cancelled: bool = False - was_active: bool = False - for task in self.tunnel_tasks: - if task.get_name() == name: - await self.stop_task(task, self.tunnel_tasks) - # was_cancelled = task.cancel() - # await asyncio.sleep(0) - # self.write_log(f"was_cancelled: {was_cancelled}") - # await task - was_active = True - break - - if not was_active: - for _, value in self.data_cols.items(): - if value["name"] == name: - self.tunnel_tasks.append( - asyncio.create_task( - self.run_subprocess(value["command"]), - name=value["name"], - ) - ) - await asyncio.sleep(0) - - async def quit(self) -> None: - """Cleanly quit the applicaiton""" - # scheduler checks for this so stop making new tasks - # when we want to quit - self.are_we_dying = True - - for task in asyncio.all_tasks(): - task.cancel() - await asyncio.sleep(0) - try: - await asyncio.gather(*asyncio.all_tasks()) - finally: - sys.exit(0) - - async def scheduler(self) -> None: - """schedulaer manages running the tests and reviving dead tunnels""" - try: - while True: - if self.are_we_dying: - return - for key, value in self.scheduler_table.items(): - if value == 0 and key not in self.tunnel_test_tasks: - tunnel_entry = self.data_cols[key] - test_task = asyncio.create_task( - asyncio.wait_for( - self.run_test_coro( - tunnel_entry["test_command"], - tunnel_entry["name"], - ), - timeout=float(tunnel_entry["test_timeout"]), - ), - name=key, - ) - self.tunnel_test_tasks.append(test_task) - self.scheduler_table[key] = int( - tunnel_entry["test_interval"] - ) - await asyncio.sleep(0) - else: - self.scheduler_table[key] = ( - self.scheduler_table[key] - 1 - ) - - # we are using a 1 second ticker. basically the scheduler - # runs every second instead of as fast as it can - await asyncio.sleep(1) - except asyncio.CancelledError: - pass - - async def tui_loop(self) -> None: - """the tui loop""" - sel: int = 0 - try: - stdscr = curses_init() - # we spawn the tunnels and the test scheduler put them - # in the background and then run the TUI loop - self.tunnel_tasks = await self.tunnel_procs() - self.scheduler_task = asyncio.create_task( - self.scheduler(), name="scheduler" - ) - loop = asyncio.get_event_loop() - loop.add_signal_handler( - signal.SIGHUP, - lambda: asyncio.create_task(self.sighup_handler()), - ) - - while True: - stdscr.clear() - render(self.data_cols, self.tunnel_tasks, stdscr, sel) - char = stdscr.getch() - - if char == ord("j") or char == curses.KEY_DOWN: - sel = (sel + 1) % len(self.data_cols) - elif char == ord("k") or char == curses.KEY_UP: - sel = (sel - 1) % len(self.data_cols) - elif char == ord("g") or char == curses.KEY_UP: - sel = 0 - elif char == ord("G") or char == curses.KEY_UP: - sel = len(self.data_cols) - 1 - elif char == ord("r"): - line_content = stdscr.instr(sel + 2, 1) - await self.restart_task(line_content.decode("utf-8")) - elif char == ord("q"): - await self.quit() - elif char == ord("s"): - line_content = stdscr.instr(sel + 2, 1) - await self.flip_task(line_content.decode("utf-8")) - - stdscr.refresh() - await asyncio.sleep(0) - finally: - curses.nocbreak() - stdscr.keypad(False) - curses.echo() - curses.endwin() - # tasks = asyncio.all_tasks() - # for task in tasks: - # task.cancel() - await self.quit() - - -def main() -> None: - """entry point""" - tunnel_manager = TunnelManager() - asyncio.run(tunnel_manager.tui_loop()) - - -if __name__ == "__main__": - main() diff --git a/tunneltop.py b/tunneltop.py new file mode 100755 index 0000000..b752fbf --- /dev/null +++ b/tunneltop.py @@ -0,0 +1,558 @@ +#!/usr/bin/env python +"""A top-like program for monitoring ssh tunnels or any tunnels""" +# TODO- the disabled coloring is not working +# TODO- quit doesnt work +# TODO- we are reviving dead tunnels +import argparse +import asyncio +import copy +import curses +import enum +import os +import signal +import sys +import tomllib +import typing + + +class Argparser: # pylint: disable=too-few-public-methods + """Argparser class.""" + + def __init__(self): + self.parser = argparse.ArgumentParser() + self.parser.add_argument( + "--config", + "-c", + type=str, + help="The path to the .tunneltop.toml file," + " defaults to $HOME/.tunneltop.toml", + default="~/.tunneltop.toml", + ) + self.parser.add_argument( + "--noheader", + "-n", + type=bool, + help="Dont print the header in the output", + default=False, + ) + self.parser.add_argument( + "--delay", + "-d", + type=float, + help="The delay between redraws in seconds, defaults to 5 seconds", + default=5, + ) + self.args = self.parser.parse_args() + + +# pylint: disable=too-few-public-methods +class Colors(enum.EnumType): + """static color definitions""" + + purple = "\033[95m" + blue = "\033[94m" + green = "\033[92m" + yellow = "\033[93m" + red = "\033[91m" + grey = "\033[1;37m" + darkgrey = "\033[1;30m" + cyan = "\033[1;36m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + blueblue = "\x1b[38;5;24m" + greenie = "\x1b[38;5;23m" + goo = "\x1b[38;5;22m" + screen_clear = "\033c\033[3J" + hide_cursor = "\033[?25l" + + +# pylint: disable=too-many-locals +def ffs( + offset: int, + header_list: typing.Optional[typing.List[str]], + numbered: bool, + no_color: bool, + *args, +) -> typing.List[str]: + """A simple columnar printer""" + max_column_width = [] + lines = [] + numbers_f: typing.List[int] = [] + dummy = [] + + if no_color or not sys.stdout.isatty(): + greenie = "" + bold = "" + endc = "" + goo = "" + blueblue = "" + else: + greenie = Colors.greenie + bold = Colors.BOLD + endc = Colors.ENDC + goo = Colors.goo + blueblue = Colors.blueblue + + for arg in args: + max_column_width.append(max(len(repr(argette)) for argette in arg)) + + if header_list is not None: + if numbered: + numbers_f.extend(range(1, len(args[-1]) + 1)) + max_column_width.append( + max(len(repr(number)) for number in numbers_f) + ) + header_list.insert(0, "idx") + + index = range(0, len(header_list)) + for header, width, i in zip(header_list, max_column_width, index): + max_column_width[i] = max(len(header), width) + offset + + for i in index: + dummy.append( + greenie + + bold + + header_list[i].ljust(max_column_width[i]) + + endc + ) + lines.append("".join(dummy)) + dummy.clear() + + index2 = range(0, len(args[-1])) + for i in index2: + if numbered: + dummy.append( + goo + bold + repr(i).ljust(max_column_width[0]) + endc + ) + for arg, width in zip(args, max_column_width[1:]): + dummy.append(blueblue + (arg[i]).ljust(width) + endc) + else: + for arg, width in zip(args, max_column_width): + dummy.append(blueblue + (arg[i]).ljust(width) + endc) + lines.append("".join(dummy)) + dummy.clear() + return lines + + +def render( + data_cols: typing.Dict[str, typing.Dict[str, str]], + tasks: typing.List[asyncio.Task], + stdscr, + sel: int, +): + """Render the text""" + lines = ffs( + 2, + ["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"], + False, + True, + [v["name"] for _, v in data_cols.items()], + [v["address"] for _, v in data_cols.items()], + [repr(v["port"]) for _, v in data_cols.items()], + [v["status"] for _, v in data_cols.items()], + [v["stdout"] for _, v in data_cols.items()], + [v["stderr"] for _, v in data_cols.items()], + ) + iterator = iter(lines) + stdscr.addstr(1, 1, lines[0], curses.color_pair(3)) + next(iterator) + for i, line in enumerate(iterator): + try: + line_content = stdscr.instr(sel + 2, 1).decode("utf-8") + name: str = line_content[: line_content.find(" ")] + finally: + name = "" + if i == sel: + stdscr.addstr( + (2 + i) % (len(lines) + 1), + 1, + line, + curses.color_pair(2) + if name not in tasks + else curses.color_pair(5), + ) + else: + stdscr.addstr( + 2 + i, + 1, + line, + curses.color_pair(1) + if name not in tasks + else curses.color_pair(4), + ) + stdscr.addstr("\n") + stdscr.box() + + +def curses_init(): + """Initialize ncurses""" + stdscr = curses.initscr() + curses.start_color() + curses.use_default_colors() + curses.curs_set(False) + curses.noecho() + curses.cbreak() + stdscr.keypad(True) + curses.halfdelay(20) + curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) + curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_GREEN) + curses.init_pair(3, curses.COLOR_BLUE, curses.COLOR_BLACK) + curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK) + curses.init_pair(5, curses.COLOR_BLACK, curses.COLOR_CYAN) + + return stdscr + + +class TunnelManager: + """The tunnel top class""" + + def __init__(self): + self.argparser = Argparser() + self.data_cols: typing.Dict[ + str, typing.Dict[str, str] + ] = self.read_conf() + self.tunnel_tasks: typing.List[asyncio.Task] = [] + self.tunnel_test_tasks: typing.List[asyncio.Task] = [] + self.scheduler_task: asyncio.Task + self.scheduler_table: typing.Dict[ + str, int + ] = self.init_scheduler_table() + # we use this when its time to quit. this will prevent any + # new tasks from being scheduled + self.are_we_dying: bool = False + + def init_scheduler_table(self) -> typing.Dict[str, int]: + """initialize the scheduler table""" + result: typing.Dict[str, int] = {} + for key, value in self.data_cols.items(): + if "test_interval" in value and value["test_command"] != "": + result[key] = 0 + + return result + + async def stop_task( + self, + delete_task: asyncio.Task, + task_list: typing.List[asyncio.Task], + delete: bool = True, + ): + """Remove the reference""" + delete_index: int = -1 + delete_task.cancel() + self.write_log(f"{delete_task.get_name()} is being cancelled\n") + await asyncio.sleep(0) + for i, task in enumerate(task_list): + if task.get_name() == delete_task.get_name(): + delete_index = i + break + + if delete and delete_index >= 0: + task_list.remove(self.tunnel_tasks[delete_index]) + + def read_conf(self) -> typing.Dict[str, typing.Dict[str, str]]: + """Read the config file""" + data_cols: typing.Dict[str, typing.Dict[str, str]] = {} + with open( + os.path.expanduser(self.argparser.args.config), "rb" + ) as conf_file: + data = tomllib.load(conf_file) + for key, value in data.items(): + data_cols[key] = { + "name": key, + "address": value["address"], + "port": value["port"], + "command": value["command"], + "status": "UNKN", + "test_command": value["test_command"], + "test_command_result": value["test_command_result"], + "test_interval": value["test_interval"], + "test_timeout": value["test_timeout"], + "stdout": "", + "stderr": "", + } + return data_cols + + async def run_subprocess(self, cmd: str) -> typing.Tuple[bytes, bytes]: + """Run a command""" + try: + proc = await asyncio.create_subprocess_exec( + *cmd.split(" "), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + return await proc.communicate() + except asyncio.TimeoutError: + proc.terminate() + raise + except asyncio.CancelledError: + proc.terminate() + raise + + async def run_test_coro( + self, cmd: str, task_name: str + ) -> typing.Tuple[bytes, bytes]: + """Run a test command""" + try: + stdout, stderr = await self.run_subprocess(cmd) + stdout_str: str = stdout.decode("utf-8").strip("\n").strip('"') + stderr_str: str = stderr.decode("utf-8").strip("\n").strip('"') + + self.data_cols[task_name]["stdout"] = stdout_str + self.data_cols[task_name]["stderr"] = stderr_str + if stdout_str == self.data_cols[task_name]["test_command_result"]: + self.data_cols[task_name]["status"] = "UP" + else: + self.data_cols[task_name]["status"] = "DOWN" + + return stdout, stderr + except asyncio.TimeoutError: + self.data_cols[task_name]["status"] = "TMOUT" + self.data_cols[task_name]["stdout"] = "-" + self.data_cols[task_name]["stderr"] = "-" + raise + + async def tunnel_procs( + self, + ) -> typing.List[asyncio.Task]: + """run all the tunnels in the background as separate subprocesses""" + tasks: typing.List[asyncio.Task] = [] + for _, value in self.data_cols.items(): + tasks.append( + asyncio.create_task( + self.run_subprocess(value["command"]), name=value["name"] + ), + ) + await asyncio.sleep(0) + + return tasks + + async def sighup_handler_async_worker(self, data_cols_new) -> None: + """Handles the actual updating of tasks when we get SIGTERM""" + delete_task: typing.Optional[asyncio.Task] = None + for k, value in data_cols_new.items(): + if k not in self.data_cols: + self.tunnel_tasks.append( + asyncio.create_task( + self.run_subprocess(value["command"]), name=k + ) + ) + await asyncio.sleep(0) + self.data_cols[k] = copy.deepcopy(value) + if k in self.scheduler_table: + self.scheduler_table[k] = 0 + else: + if ( + self.data_cols[k]["command"] != data_cols_new[k]["command"] + or self.data_cols[k]["port"] != data_cols_new[k]["port"] + or self.data_cols[k]["address"] + != data_cols_new[k]["address"] + ): + for task in self.tunnel_tasks: + if task.get_name() == k: + delete_task = task + break + # task.cancel() + # await asyncio.sleep(0) + + if delete_task is not None: + await self.stop_task(delete_task, self.tunnel_tasks) + delete_task = None + self.data_cols[k] = copy.deepcopy(data_cols_new[k]) + self.tunnel_tasks.append( + asyncio.create_task( + self.run_subprocess(value["command"]), name=k + ) + ) + if k in self.scheduler_table: + self.scheduler_table[k] = 0 + await asyncio.sleep(0) + + for k, _ in self.data_cols.items(): + if k not in data_cols_new: + for task in self.tunnel_tasks: + if task.get_name() == k: + # task.cancel() + # await asyncio.sleep(0) + delete_task = task + break + if delete_task is not None: + await self.stop_task(delete_task, self.tunnel_tasks) + delete_task = None + del self.data_cols[k] + if k in self.scheduler_table: + del self.scheduler_table[k] + + async def sighup_handler(self) -> None: + """SIGHUP handler. we want to reload the config.""" + # type: ignore # pylint: disable=E0203 + data_cols_new: typing.Dict[str, typing.Dict[str, str]] = {} + data_cols_new = self.read_conf() + await self.sighup_handler_async_worker(data_cols_new) + + def write_log(self, log: str): + """A simple logger""" + with open( + "/home/devi/devi/abbatoir/hole15/log", + "a", + encoding="utf-8", + ) as logfile: + logfile.write(log) + + async def restart_task(self, line_content: str) -> None: + """restart a task""" + name: str = line_content[: line_content.find(" ")] + # was_cancelled: bool = False + for task in self.tunnel_tasks: + if task.get_name() == name: + # was_cancelled = task.cancel() + # self.write_log(f"was_cancelled: {was_cancelled}") + await self.stop_task(task, self.tunnel_tasks) + # await task + # await asyncio.sleep(0) + for _, value in self.data_cols.items(): + if value["name"] == name and task.cancelled(): + self.tunnel_tasks.append( + asyncio.create_task( + self.run_subprocess(value["command"]), + name=value["name"], + ) + ) + await asyncio.sleep(0) + + async def flip_task(self, line_content: str) -> None: + """flip a task""" + name: str = line_content[: line_content.find(" ")] + # was_cancelled: bool = False + was_active: bool = False + for task in self.tunnel_tasks: + if task.get_name() == name: + await self.stop_task(task, self.tunnel_tasks) + # was_cancelled = task.cancel() + # await asyncio.sleep(0) + # self.write_log(f"was_cancelled: {was_cancelled}") + # await task + was_active = True + break + + if not was_active: + for _, value in self.data_cols.items(): + if value["name"] == name: + self.tunnel_tasks.append( + asyncio.create_task( + self.run_subprocess(value["command"]), + name=value["name"], + ) + ) + await asyncio.sleep(0) + + async def quit(self) -> None: + """Cleanly quit the applicaiton""" + # scheduler checks for this to stop running new tests + # when we want to quit + self.are_we_dying = True + + for task in asyncio.all_tasks(): + task.cancel() + await asyncio.sleep(0) + try: + await asyncio.gather(*asyncio.all_tasks()) + finally: + sys.exit(0) + + async def scheduler(self) -> None: + """scheduler manages running the tests and reviving dead tunnels""" + try: + while True: + if self.are_we_dying: + return + for key, value in self.scheduler_table.items(): + if value == 0 and key not in self.tunnel_test_tasks: + tunnel_entry = self.data_cols[key] + test_task = asyncio.create_task( + asyncio.wait_for( + self.run_test_coro( + tunnel_entry["test_command"], + tunnel_entry["name"], + ), + timeout=float(tunnel_entry["test_timeout"]), + ), + name=key, + ) + self.tunnel_test_tasks.append(test_task) + self.scheduler_table[key] = int( + tunnel_entry["test_interval"] + ) + await asyncio.sleep(0) + else: + self.scheduler_table[key] = ( + self.scheduler_table[key] - 1 + ) + + # we are using a 1 second ticker. basically the scheduler + # runs every second instead of as fast as it can + await asyncio.sleep(1) + except asyncio.CancelledError: + pass + + async def tui_loop(self) -> None: + """the tui loop""" + sel: int = 0 + try: + stdscr = curses_init() + # we spawn the tunnels and the test scheduler put them + # in the background and then run the TUI loop + self.tunnel_tasks = await self.tunnel_procs() + self.scheduler_task = asyncio.create_task( + self.scheduler(), name="scheduler" + ) + loop = asyncio.get_event_loop() + loop.add_signal_handler( + signal.SIGHUP, + lambda: asyncio.create_task(self.sighup_handler()), + ) + + while True: + stdscr.clear() + render(self.data_cols, self.tunnel_tasks, stdscr, sel) + char = stdscr.getch() + + if char == ord("j") or char == curses.KEY_DOWN: + sel = (sel + 1) % len(self.data_cols) + elif char == ord("k") or char == curses.KEY_UP: + sel = (sel - 1) % len(self.data_cols) + elif char == ord("g") or char == curses.KEY_UP: + sel = 0 + elif char == ord("G") or char == curses.KEY_UP: + sel = len(self.data_cols) - 1 + elif char == ord("r"): + line_content = stdscr.instr(sel + 2, 1) + await self.restart_task(line_content.decode("utf-8")) + elif char == ord("q"): + await self.quit() + elif char == ord("s"): + line_content = stdscr.instr(sel + 2, 1) + await self.flip_task(line_content.decode("utf-8")) + + stdscr.refresh() + await asyncio.sleep(0) + finally: + curses.nocbreak() + stdscr.keypad(False) + curses.echo() + curses.endwin() + # tasks = asyncio.all_tasks() + # for task in tasks: + # task.cancel() + await self.quit() + + +def main() -> None: + """entry point""" + tunnel_manager = TunnelManager() + asyncio.run(tunnel_manager.tui_loop()) + + +if __name__ == "__main__": + main() -- cgit v1.2.3