diff options
| author | terminaldweller <thabogre@gmail.com> | 2023-02-01 14:42:58 +0000 | 
|---|---|---|
| committer | terminaldweller <thabogre@gmail.com> | 2023-02-01 14:42:58 +0000 | 
| commit | 8bc9e578472d8a3f1bc66c50fd8c808ba63777d4 (patch) | |
| tree | e1b62b927af6d283e66bbff15b39a2e737e86a37 /bin/tunneltop | |
| parent | update (diff) | |
| download | scripts-8bc9e578472d8a3f1bc66c50fd8c808ba63777d4.tar.gz scripts-8bc9e578472d8a3f1bc66c50fd8c808ba63777d4.zip | |
updates
Diffstat (limited to 'bin/tunneltop')
| -rwxr-xr-x | bin/tunneltop | 554 | 
1 files changed, 0 insertions, 554 deletions
| diff --git a/bin/tunneltop b/bin/tunneltop deleted file mode 100755 index df3510c..0000000 --- a/bin/tunneltop +++ /dev/null @@ -1,554 +0,0 @@ -#!/usr/bin/env python -"""A top-like program for monitoring ssh tunnels""" -# TODO- task cancellation is very slow as should be with tasks -import argparse -import asyncio -import copy -import curses -import enum -import os -import signal -import sys -import tomllib -import typing - - -class Argparser:  # pylint: disable=too-few-public-methods -    """Argparser class.""" - -    def __init__(self): -        self.parser = argparse.ArgumentParser() -        self.parser.add_argument( -            "--config", -            "-c", -            type=str, -            help="The path to the .tunneltop.toml file," -            " defaults to $HOME/.tunneltop.toml", -            default="~/.tunneltop.toml", -        ) -        self.parser.add_argument( -            "--noheader", -            "-n", -            type=bool, -            help="Dont print the header in the output", -            default=False, -        ) -        self.parser.add_argument( -            "--delay", -            "-d", -            type=float, -            help="The delay between redraws in seconds, defaults to 5 seconds", -            default=5, -        ) -        self.args = self.parser.parse_args() - - -# pylint: disable=too-few-public-methods -class Colors(enum.EnumType): -    """static color definitions""" - -    purple = "\033[95m" -    blue = "\033[94m" -    green = "\033[92m" -    yellow = "\033[93m" -    red = "\033[91m" -    grey = "\033[1;37m" -    darkgrey = "\033[1;30m" -    cyan = "\033[1;36m" -    ENDC = "\033[0m" -    BOLD = "\033[1m" -    UNDERLINE = "\033[4m" -    blueblue = "\x1b[38;5;24m" -    greenie = "\x1b[38;5;23m" -    goo = "\x1b[38;5;22m" -    screen_clear = "\033c\033[3J" -    hide_cursor = "\033[?25l" - - -# pylint: disable=too-many-locals -def ffs( -    offset: int, -    header_list: typing.Optional[typing.List[str]], -    numbered: bool, -    no_color: bool, -    *args, -) -> typing.List[str]: -    """A simple columnar printer""" -    max_column_width = [] -    lines = [] -    numbers_f: typing.List[int] = [] -    dummy = [] - -    if no_color or not sys.stdout.isatty(): -        greenie = "" -        bold = "" -        endc = "" -        goo = "" -        blueblue = "" -    else: -        greenie = Colors.greenie -        bold = Colors.BOLD -        endc = Colors.ENDC -        goo = Colors.goo -        blueblue = Colors.blueblue - -    for arg in args: -        max_column_width.append(max(len(repr(argette)) for argette in arg)) - -    if header_list is not None: -        if numbered: -            numbers_f.extend(range(1, len(args[-1]) + 1)) -            max_column_width.append( -                max(len(repr(number)) for number in numbers_f) -            ) -            header_list.insert(0, "idx") - -        index = range(0, len(header_list)) -        for header, width, i in zip(header_list, max_column_width, index): -            max_column_width[i] = max(len(header), width) + offset - -        for i in index: -            dummy.append( -                greenie -                + bold -                + header_list[i].ljust(max_column_width[i]) -                + endc -            ) -        lines.append("".join(dummy)) -        dummy.clear() - -    index2 = range(0, len(args[-1])) -    for i in index2: -        if numbered: -            dummy.append( -                goo + bold + repr(i).ljust(max_column_width[0]) + endc -            ) -            for arg, width in zip(args, max_column_width[1:]): -                dummy.append(blueblue + (arg[i]).ljust(width) + endc) -        else: -            for arg, width in zip(args, max_column_width): -                dummy.append(blueblue + (arg[i]).ljust(width) + endc) -        lines.append("".join(dummy)) -        dummy.clear() -    return lines - - -def render( -    data_cols: typing.Dict[str, typing.Dict[str, str]], -    tasks: typing.List[asyncio.Task], -    stdscr, -    sel: int, -): -    """Render the text""" -    lines = ffs( -        2, -        ["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"], -        False, -        True, -        [v["name"] for _, v in data_cols.items()], -        [v["address"] for _, v in data_cols.items()], -        [repr(v["port"]) for _, v in data_cols.items()], -        [v["status"] for _, v in data_cols.items()], -        [v["stdout"] for _, v in data_cols.items()], -        [v["stderr"] for _, v in data_cols.items()], -    ) -    iterator = iter(lines) -    stdscr.addstr(1, 1, lines[0], curses.color_pair(3)) -    next(iterator) -    for i, line in enumerate(iterator): -        try: -            line_content = stdscr.instr(sel + 2, 1).decode("utf-8") -            name: str = line_content[: line_content.find(" ")] -        finally: -            name = "" -        if i == sel: -            stdscr.addstr( -                (2 + i) % (len(lines) + 1), -                1, -                line, -                curses.color_pair(2) -                if name not in tasks -                else curses.color_pair(5), -            ) -        else: -            stdscr.addstr( -                2 + i, -                1, -                line, -                curses.color_pair(1) -                if name not in tasks -                else curses.color_pair(4), -            ) -        stdscr.addstr("\n") -    stdscr.box() - - -def curses_init(): -    """Initialize ncurses""" -    stdscr = curses.initscr() -    curses.start_color() -    curses.use_default_colors() -    curses.curs_set(False) -    curses.noecho() -    curses.cbreak() -    stdscr.keypad(True) -    curses.halfdelay(20) -    curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) -    curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_GREEN) -    curses.init_pair(3, curses.COLOR_BLUE, curses.COLOR_BLACK) -    curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK) -    curses.init_pair(5, curses.COLOR_BLACK, curses.COLOR_CYAN) - -    return stdscr - - -class TunnelManager: -    """The tunnel top class""" - -    def __init__(self): -        self.argparser = Argparser() -        self.data_cols: typing.Dict[ -            str, typing.Dict[str, str] -        ] = self.read_conf() -        self.tunnel_tasks: typing.List[asyncio.Task] = [] -        self.tunnel_test_tasks: typing.List[asyncio.Task] = [] -        self.scheduler_task: asyncio.Task -        self.scheduler_table: typing.Dict[ -            str, int -        ] = self.init_scheduler_table() -        # we use this when its time to quit. this will prevent any -        # new tasks from being scheduled -        self.are_we_dying: bool = False - -    def init_scheduler_table(self) -> typing.Dict[str, int]: -        """initialize the scheduler table""" -        result: typing.Dict[str, int] = {} -        for key, value in self.data_cols.items(): -            if "test_interval" in value and value["test_command"] != "": -                result[key] = 0 - -        return result - -    async def stop_task( -        self, -        delete_task: asyncio.Task, -        task_list: typing.List[asyncio.Task], -        delete: bool = True, -    ): -        """Remove the reference""" -        delete_index: int = -1 -        delete_task.cancel() -        self.write_log(f"{delete_task.get_name()} is being cancelled\n") -        await asyncio.sleep(0) -        for i, task in enumerate(task_list): -            if task.get_name() == delete_task.get_name(): -                delete_index = i -                break - -        if delete and delete_index >= 0: -            task_list.remove(self.tunnel_tasks[delete_index]) - -    def read_conf(self) -> typing.Dict[str, typing.Dict[str, str]]: -        """Read the config file""" -        data_cols: typing.Dict[str, typing.Dict[str, str]] = {} -        with open( -            os.path.expanduser(self.argparser.args.config), "rb" -        ) as conf_file: -            data = tomllib.load(conf_file) -            for key, value in data.items(): -                data_cols[key] = { -                    "name": key, -                    "address": value["address"], -                    "port": value["port"], -                    "command": value["command"], -                    "status": "UNKN", -                    "test_command": value["test_command"], -                    "test_command_result": value["test_command_result"], -                    "test_interval": value["test_interval"], -                    "test_timeout": value["test_timeout"], -                    "stdout": "", -                    "stderr": "", -                } -        return data_cols - -    async def run_subprocess(self, cmd: str) -> typing.Tuple[bytes, bytes]: -        """Run a command""" -        try: -            proc = await asyncio.create_subprocess_exec( -                *cmd.split(" "), -                stdout=asyncio.subprocess.PIPE, -                stderr=asyncio.subprocess.PIPE, -            ) - -            return await proc.communicate() -        except asyncio.TimeoutError: -            proc.terminate() -            return (bytes(), bytes()) -        except asyncio.CancelledError: -            proc.terminate() -            raise - -    async def run_test_coro( -        self, cmd: str, task_name: str -    ) -> typing.Tuple[bytes, bytes]: -        """Run a test command""" -        try: -            stdout, stderr = await self.run_subprocess(cmd) -            stdout_str: str = stdout.decode("utf-8").strip("\n").strip('"') -            stderr_str: str = stderr.decode("utf-8").strip("\n").strip('"') - -            self.data_cols[task_name]["stdout"] = stdout_str -            self.data_cols[task_name]["stderr"] = stderr_str -            if stdout_str == self.data_cols[task_name]["test_command_result"]: -                self.data_cols[task_name]["status"] = "UP" -            else: -                self.data_cols[task_name]["status"] = "DOWN" - -            return stdout, stderr -        except asyncio.TimeoutError: -            self.data_cols[task_name]["status"] = "TMOUT" -            raise - -    async def tunnel_procs( -        self, -    ) -> typing.List[asyncio.Task]: -        """run all the tunnels in the background as separate tasks""" -        tasks: typing.List[asyncio.Task] = [] -        for _, value in self.data_cols.items(): -            tasks.append( -                asyncio.create_task( -                    self.run_subprocess(value["command"]), name=value["name"] -                ), -            ) -            await asyncio.sleep(0) - -        return tasks - -    async def sighup_handler_async_worker(self, data_cols_new) -> None: -        """Handles the actual updating of tasks when we get SIGTERM""" -        delete_task: typing.Optional[asyncio.Task] = None -        for k, value in data_cols_new.items(): -            if k not in self.data_cols: -                self.tunnel_tasks.append( -                    asyncio.create_task( -                        self.run_subprocess(value["command"]), name=k -                    ) -                ) -                await asyncio.sleep(0) -                self.data_cols[k] = copy.deepcopy(value) -                if k in self.scheduler_table: -                    self.scheduler_table[k] = 0 -            else: -                if ( -                    self.data_cols[k]["command"] != data_cols_new[k]["command"] -                    or self.data_cols[k]["port"] != data_cols_new[k]["port"] -                    or self.data_cols[k]["address"] -                    != data_cols_new[k]["address"] -                ): -                    for task in self.tunnel_tasks: -                        if task.get_name() == k: -                            delete_task = task -                            break -                            # task.cancel() -                            # await asyncio.sleep(0) - -                    if delete_task is not None: -                        await self.stop_task(delete_task, self.tunnel_tasks) -                        delete_task = None -                    self.data_cols[k] = copy.deepcopy(data_cols_new[k]) -                    self.tunnel_tasks.append( -                        asyncio.create_task( -                            self.run_subprocess(value["command"]), name=k -                        ) -                    ) -                    if k in self.scheduler_table: -                        self.scheduler_table[k] = 0 -                    await asyncio.sleep(0) - -        for k, _ in self.data_cols.items(): -            if k not in data_cols_new: -                for task in self.tunnel_tasks: -                    if task.get_name() == k: -                        # task.cancel() -                        # await asyncio.sleep(0) -                        delete_task = task -                        break -                if delete_task is not None: -                    await self.stop_task(delete_task, self.tunnel_tasks) -                    delete_task = None -                del self.data_cols[k] -                if k in self.scheduler_table: -                    del self.scheduler_table[k] - -    async def sighup_handler(self) -> None: -        """SIGHUP handler. we want to reload the config.""" -        # type: ignore # pylint: disable=E0203 -        data_cols_new: typing.Dict[str, typing.Dict[str, str]] = {} -        data_cols_new = self.read_conf() -        await self.sighup_handler_async_worker(data_cols_new) - -    def write_log(self, log: str): -        """A simple logger""" -        with open( -            "/home/devi/devi/abbatoir/hole15/log", -            "a", -            encoding="utf-8", -        ) as logfile: -            logfile.write(log) - -    async def restart_task(self, line_content: str) -> None: -        """restart a task""" -        name: str = line_content[: line_content.find(" ")] -        # was_cancelled: bool = False -        for task in self.tunnel_tasks: -            if task.get_name() == name: -                # was_cancelled = task.cancel() -                # self.write_log(f"was_cancelled: {was_cancelled}") -                await self.stop_task(task, self.tunnel_tasks) -                # await task -                # await asyncio.sleep(0) -                for _, value in self.data_cols.items(): -                    if value["name"] == name and task.cancelled(): -                        self.tunnel_tasks.append( -                            asyncio.create_task( -                                self.run_subprocess(value["command"]), -                                name=value["name"], -                            ) -                        ) -                        await asyncio.sleep(0) - -    async def flip_task(self, line_content: str) -> None: -        """flip a task""" -        name: str = line_content[: line_content.find(" ")] -        # was_cancelled: bool = False -        was_active: bool = False -        for task in self.tunnel_tasks: -            if task.get_name() == name: -                await self.stop_task(task, self.tunnel_tasks) -                # was_cancelled = task.cancel() -                # await asyncio.sleep(0) -                # self.write_log(f"was_cancelled: {was_cancelled}") -                # await task -                was_active = True -                break - -        if not was_active: -            for _, value in self.data_cols.items(): -                if value["name"] == name: -                    self.tunnel_tasks.append( -                        asyncio.create_task( -                            self.run_subprocess(value["command"]), -                            name=value["name"], -                        ) -                    ) -                    await asyncio.sleep(0) - -    async def quit(self) -> None: -        """Cleanly quit the applicaiton""" -        # scheduler checks for this so stop making new tasks -        # when we want to quit -        self.are_we_dying = True - -        for task in asyncio.all_tasks(): -            task.cancel() -            await asyncio.sleep(0) -        try: -            await asyncio.gather(*asyncio.all_tasks()) -        finally: -            sys.exit(0) - -    async def scheduler(self) -> None: -        """schedulaer manages running the tests and reviving dead tunnels""" -        try: -            while True: -                if self.are_we_dying: -                    return -                for key, value in self.scheduler_table.items(): -                    if value == 0 and key not in self.tunnel_test_tasks: -                        tunnel_entry = self.data_cols[key] -                        test_task = asyncio.create_task( -                            asyncio.wait_for( -                                self.run_test_coro( -                                    tunnel_entry["test_command"], -                                    tunnel_entry["name"], -                                ), -                                timeout=float(tunnel_entry["test_timeout"]), -                            ), -                            name=key, -                        ) -                        self.tunnel_test_tasks.append(test_task) -                        self.scheduler_table[key] = int( -                            tunnel_entry["test_interval"] -                        ) -                        await asyncio.sleep(0) -                    else: -                        self.scheduler_table[key] = ( -                            self.scheduler_table[key] - 1 -                        ) - -                # we are using a 1 second ticker. basically the scheduler -                # runs every second instead of as fast as it can -                await asyncio.sleep(1) -        except asyncio.CancelledError: -            pass - -    async def tui_loop(self) -> None: -        """the tui loop""" -        sel: int = 0 -        try: -            stdscr = curses_init() -            # we spawn the tunnels and the test scheduler put them -            # in the background and then run the TUI loop -            self.tunnel_tasks = await self.tunnel_procs() -            self.scheduler_task = asyncio.create_task( -                self.scheduler(), name="scheduler" -            ) -            loop = asyncio.get_event_loop() -            loop.add_signal_handler( -                signal.SIGHUP, -                lambda: asyncio.create_task(self.sighup_handler()), -            ) - -            while True: -                stdscr.clear() -                render(self.data_cols, self.tunnel_tasks, stdscr, sel) -                char = stdscr.getch() - -                if char == ord("j") or char == curses.KEY_DOWN: -                    sel = (sel + 1) % len(self.data_cols) -                elif char == ord("k") or char == curses.KEY_UP: -                    sel = (sel - 1) % len(self.data_cols) -                elif char == ord("g") or char == curses.KEY_UP: -                    sel = 0 -                elif char == ord("G") or char == curses.KEY_UP: -                    sel = len(self.data_cols) - 1 -                elif char == ord("r"): -                    line_content = stdscr.instr(sel + 2, 1) -                    await self.restart_task(line_content.decode("utf-8")) -                elif char == ord("q"): -                    await self.quit() -                elif char == ord("s"): -                    line_content = stdscr.instr(sel + 2, 1) -                    await self.flip_task(line_content.decode("utf-8")) - -                for task in self.tunnel_tasks: -                    self.write_log( -                        f"{task.get_name()} is {task.cancelled()} or {task.cancelling()}\n" -                    ) - -                stdscr.refresh() -                await asyncio.sleep(0) -        finally: -            curses.nocbreak() -            stdscr.keypad(False) -            curses.echo() -            curses.endwin() -            # tasks = asyncio.all_tasks() -            # for task in tasks: -            #     task.cancel() -            await self.quit() - - -if __name__ == "__main__": -    tunnel_manager = TunnelManager() -    asyncio.run(tunnel_manager.tui_loop()) | 
