From 4b29c35f6b5b03d19e9cd723de30a885311c497b Mon Sep 17 00:00:00 2001 From: terminaldweller Date: Thu, 19 Jan 2023 00:13:08 +0330 Subject: update --- bin/tunneltop | 264 ++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 191 insertions(+), 73 deletions(-) (limited to 'bin/tunneltop') diff --git a/bin/tunneltop b/bin/tunneltop index 7dba98c..7bf1878 100755 --- a/bin/tunneltop +++ b/bin/tunneltop @@ -4,6 +4,7 @@ import argparse import asyncio import copy +import curses import enum import os import signal @@ -70,6 +71,7 @@ def ffs( offset: int, header_list: typing.Optional[typing.List[str]], numbered: bool, + no_color: bool, *args, ) -> typing.List[str]: """A simple columnar printer""" @@ -78,18 +80,18 @@ def ffs( numbers_f: typing.List[int] = [] dummy = [] - if sys.stdout.isatty(): - greenie = Colors.greenie - bold = Colors.BOLD - endc = Colors.ENDC - goo = Colors.goo - blueblue = Colors.blueblue - else: + if no_color or not sys.stdout.isatty(): greenie = "" bold = "" endc = "" goo = "" blueblue = "" + else: + greenie = Colors.greenie + bold = Colors.BOLD + endc = Colors.ENDC + goo = Colors.goo + blueblue = Colors.blueblue for arg in args: max_column_width.append(max(len(repr(argette)) for argette in arg)) @@ -132,6 +134,40 @@ def ffs( return lines +def render(lines: typing.List[str], stdscr, sel: int): + """Render the text""" + iterator = iter(lines) + stdscr.addstr(1, 1, lines[0], curses.color_pair(3)) + next(iterator) + for i, line in enumerate(iterator): + if i == sel: + stdscr.addstr( + (2 + i) % (len(lines) + 1), 1, line, curses.color_pair(2) + ) + else: + stdscr.addstr(2 + i, 1, line, curses.color_pair(1)) + stdscr.addstr("\n") + stdscr.box() + + +def curses_init(): + """Initialize ncurses""" + stdscr = curses.initscr() + curses.start_color() + curses.use_default_colors() + curses.curs_set(False) + curses.noecho() + curses.cbreak() + stdscr.keypad(True) + curses.halfdelay(20) + curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) + curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_GREEN) + curses.init_pair(3, curses.COLOR_BLUE, curses.COLOR_BLACK) + curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK) + + return stdscr + + class TunnelTop: """The tunnel top class""" @@ -141,13 +177,40 @@ class TunnelTop: self.tunnel_tasks: typing.List[asyncio.Task] = [] self.tunnel_test_tasks: typing.List[asyncio.Task] = [] + def read_conf(self) -> typing.Dict[str, typing.Dict[str, str]]: + """Read the config file""" + data_cols: typing.Dict[str, typing.Dict[str, str]] = {} + with open( + os.path.expanduser(self.argparser.args.config), "rb" + ) as conf_file: + data = tomllib.load(conf_file) + for key, value in data.items(): + data_cols[key] = { + "name": key, + "address": value["address"], + "port": value["port"], + "command": value["command"], + "status": "UNKN", + "test_command": value["test_command"], + "test_command_result": value["test_command_result"], + "test_interval": value["test_interval"], + "test_timeout": value["test_timeout"], + "stdout": "", + "stderr": "", + } + return data_cols + async def run_subshell(self, cmd: str) -> typing.Tuple[bytes, bytes]: """Run a command in a subshell""" proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) - return await proc.communicate() + try: + return await proc.communicate() + except asyncio.CancelledError: + self.write_log("fucking fuck") + return (bytes(), bytes()) def tunnel_test_callback(self, task: asyncio.Task) -> None: """Tunnel test callback function.""" @@ -168,6 +231,8 @@ class TunnelTop: self.data_cols[task_name]["status"] = "DOWN" except asyncio.TimeoutError: self.data_cols[task_name]["status"] = "TMOUT" + except asyncio.CancelledError: + self.data_cols[task_name]["status"] = "CANCELLED" async def tunnel_test_procs(self) -> typing.List[asyncio.Task]: """run all the tunnel tests in the background as separate tasks""" @@ -203,7 +268,7 @@ class TunnelTop: return tasks - async def sighup_handler_async_worker(self, data_cols_new): + async def sighup_handler_async_worker(self, data_cols_new) -> None: """Handles the actual updating of tasks when we get SIGTERM""" for k, value in data_cols_new.items(): if k not in self.data_cols: @@ -238,80 +303,133 @@ class TunnelTop: task.cancel() del self.data_cols[k] - async def sighup_handler(self): + async def sighup_handler(self) -> None: """SIGHUP handler. we want to reload the config.""" # type: ignore # pylint: disable=E0203 data_cols_new: typing.Dict[str, typing.Dict[str, str]] = {} - with open(self.argparser.args.config, "rb") as conf_file: - data = tomllib.load(conf_file) - for key, value in data.items(): - data_cols_new[key] = { - "name": key, - "address": value["address"], - "port": value["port"], - "command": value["command"], - "status": "UNKN", - "test_command": value["test_command"], - "test_command_result": value["test_command_result"], - "test_interval": value["test_interval"], - "test_timeout": value["test_timeout"], - "stdout": "", - "stderr": "", - } + data_cols_new = self.read_conf() await self.sighup_handler_async_worker(data_cols_new) + def write_log(self, log: str): + """A simple logger""" + with open( + "/home/devi/devi/abbatoir/hole15/log", + "w", + encoding="utf-8", + ) as logfile: + logfile.write(log) + + async def restart_task(self, line_content: str) -> None: + """restart a task""" + name: str = line_content[: line_content.find(" ")] + was_cancelled: bool = False + for task in self.tunnel_tasks: + if task.get_name() == name: + was_cancelled = task.cancel() + self.write_log(f"was_cancelled: {was_cancelled}") + await task + for _, value in self.data_cols.items(): + if value["name"] == name: + self.tunnel_tasks.append( + asyncio.create_task( + self.run_subshell(value["command"]), + name=value["name"], + ) + ) + await asyncio.sleep(0) + + async def flip_task(self, line_content: str) -> None: + """flip a task""" + name: str = line_content[: line_content.find(" ")] + was_cancelled: bool = False + was_active: bool = False + for task in self.tunnel_tasks: + if task.get_name() == name: + was_cancelled = task.cancel() + await asyncio.sleep(0) + self.write_log(f"was_cancelled: {was_cancelled}") + await task + was_active = True + break + + if not was_active: + for _, value in self.data_cols.items(): + if value["name"] == name: + self.tunnel_tasks.append( + asyncio.create_task( + self.run_subshell(value["command"]), + name=value["name"], + ) + ) + await asyncio.sleep(0) + break + + async def quit(self) -> None: + """Cleanly quit the applicaiton""" + for tunnel_test_task in self.tunnel_test_tasks: + tunnel_test_task.cancel() + for tunnel_task in self.tunnel_tasks: + tunnel_task.cancel() + async def main(self) -> None: """entrypoint""" - print(Colors.screen_clear, end="") - print(Colors.hide_cursor, end="") + sel: int = 0 + try: + stdscr = curses_init() - with open( - os.path.expanduser(self.argparser.args.config), "rb" - ) as conf_file: - data = tomllib.load(conf_file) - for key, value in data.items(): - self.data_cols[key] = { - "name": key, - "address": value["address"], - "port": value["port"], - "command": value["command"], - "status": "UNKN", - "test_command": value["test_command"], - "test_command_result": value["test_command_result"], - "test_interval": value["test_interval"], - "test_timeout": value["test_timeout"], - "stdout": "", - "stderr": "", - } + self.data_cols = self.read_conf() - loop = asyncio.get_event_loop() - loop.add_signal_handler( - signal.SIGHUP, - lambda: asyncio.create_task(self.sighup_handler()), - ) - self.tunnel_tasks = await self.tunnel_procs() - - while True: - self.tunnel_test_tasks = await self.tunnel_test_procs() - lines = ffs( - 2, - ["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"] - if not self.argparser.args.noheader - else None, - False, - [v["name"] for _, v in self.data_cols.items()], - [v["address"] for _, v in self.data_cols.items()], - [repr(v["port"]) for _, v in self.data_cols.items()], - [v["status"] for _, v in self.data_cols.items()], - [v["stdout"] for _, v in self.data_cols.items()], - [v["stderr"] for _, v in self.data_cols.items()], + loop = asyncio.get_event_loop() + loop.add_signal_handler( + signal.SIGHUP, + lambda: asyncio.create_task(self.sighup_handler()), ) - for line in lines: - print(line) - - await asyncio.sleep(self.argparser.args.delay) - print(Colors.screen_clear, end="") - print(Colors.hide_cursor, end="") + self.tunnel_tasks = await self.tunnel_procs() + + while True: + # self.tunnel_test_tasks = await self.tunnel_test_procs() + lines = ffs( + 2, + ["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"] + if not self.argparser.args.noheader + else None, + False, + True, + [v["name"] for _, v in self.data_cols.items()], + [v["address"] for _, v in self.data_cols.items()], + [repr(v["port"]) for _, v in self.data_cols.items()], + [v["status"] for _, v in self.data_cols.items()], + [v["stdout"] for _, v in self.data_cols.items()], + [v["stderr"] for _, v in self.data_cols.items()], + ) + stdscr.clear() + render(lines, stdscr, sel) + char = stdscr.getch() + + if char == ord("j") or char == curses.KEY_DOWN: + sel = (sel + 1) % len(self.data_cols) + elif char == ord("k") or char == curses.KEY_UP: + sel = (sel - 1) % len(self.data_cols) + elif char == ord("r"): + line_content = stdscr.instr(sel + 2, 1) + await self.restart_task(line_content.decode("utf-8")) + elif char == ord("q"): + await self.quit() + # elif char == curses.KEY_ENTER: + elif char == ord("s"): + line_content = stdscr.instr(sel + 2, 1) + await self.flip_task(line_content.decode("utf-8")) + + stdscr.refresh() + await asyncio.sleep(0) + finally: + curses.nocbreak() + stdscr.keypad(False) + curses.echo() + curses.endwin() + tasks = asyncio.all_tasks() + for task in tasks: + task.cancel() if __name__ == "__main__": -- cgit v1.2.3