diff options
Diffstat (limited to '')
| -rwxr-xr-x | bin/tunneltop | 264 | 
1 files changed, 191 insertions, 73 deletions
| diff --git a/bin/tunneltop b/bin/tunneltop index 7dba98c..7bf1878 100755 --- a/bin/tunneltop +++ b/bin/tunneltop @@ -4,6 +4,7 @@  import argparse  import asyncio  import copy +import curses  import enum  import os  import signal @@ -70,6 +71,7 @@ def ffs(      offset: int,      header_list: typing.Optional[typing.List[str]],      numbered: bool, +    no_color: bool,      *args,  ) -> typing.List[str]:      """A simple columnar printer""" @@ -78,18 +80,18 @@ def ffs(      numbers_f: typing.List[int] = []      dummy = [] -    if sys.stdout.isatty(): -        greenie = Colors.greenie -        bold = Colors.BOLD -        endc = Colors.ENDC -        goo = Colors.goo -        blueblue = Colors.blueblue -    else: +    if no_color or not sys.stdout.isatty():          greenie = ""          bold = ""          endc = ""          goo = ""          blueblue = "" +    else: +        greenie = Colors.greenie +        bold = Colors.BOLD +        endc = Colors.ENDC +        goo = Colors.goo +        blueblue = Colors.blueblue      for arg in args:          max_column_width.append(max(len(repr(argette)) for argette in arg)) @@ -132,6 +134,40 @@ def ffs(      return lines +def render(lines: typing.List[str], stdscr, sel: int): +    """Render the text""" +    iterator = iter(lines) +    stdscr.addstr(1, 1, lines[0], curses.color_pair(3)) +    next(iterator) +    for i, line in enumerate(iterator): +        if i == sel: +            stdscr.addstr( +                (2 + i) % (len(lines) + 1), 1, line, curses.color_pair(2) +            ) +        else: +            stdscr.addstr(2 + i, 1, line, curses.color_pair(1)) +        stdscr.addstr("\n") +    stdscr.box() + + +def curses_init(): +    """Initialize ncurses""" +    stdscr = curses.initscr() +    curses.start_color() +    curses.use_default_colors() +    curses.curs_set(False) +    curses.noecho() +    curses.cbreak() +    stdscr.keypad(True) +    curses.halfdelay(20) +    curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) +    curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_GREEN) +    curses.init_pair(3, curses.COLOR_BLUE, curses.COLOR_BLACK) +    curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK) + +    return stdscr + +  class TunnelTop:      """The tunnel top class""" @@ -141,13 +177,40 @@ class TunnelTop:          self.tunnel_tasks: typing.List[asyncio.Task] = []          self.tunnel_test_tasks: typing.List[asyncio.Task] = [] +    def read_conf(self) -> typing.Dict[str, typing.Dict[str, str]]: +        """Read the config file""" +        data_cols: typing.Dict[str, typing.Dict[str, str]] = {} +        with open( +            os.path.expanduser(self.argparser.args.config), "rb" +        ) as conf_file: +            data = tomllib.load(conf_file) +            for key, value in data.items(): +                data_cols[key] = { +                    "name": key, +                    "address": value["address"], +                    "port": value["port"], +                    "command": value["command"], +                    "status": "UNKN", +                    "test_command": value["test_command"], +                    "test_command_result": value["test_command_result"], +                    "test_interval": value["test_interval"], +                    "test_timeout": value["test_timeout"], +                    "stdout": "", +                    "stderr": "", +                } +        return data_cols +      async def run_subshell(self, cmd: str) -> typing.Tuple[bytes, bytes]:          """Run a command in a subshell"""          proc = await asyncio.create_subprocess_shell(              cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE          ) -        return await proc.communicate() +        try: +            return await proc.communicate() +        except asyncio.CancelledError: +            self.write_log("fucking fuck") +            return (bytes(), bytes())      def tunnel_test_callback(self, task: asyncio.Task) -> None:          """Tunnel test callback function.""" @@ -168,6 +231,8 @@ class TunnelTop:                  self.data_cols[task_name]["status"] = "DOWN"          except asyncio.TimeoutError:              self.data_cols[task_name]["status"] = "TMOUT" +        except asyncio.CancelledError: +            self.data_cols[task_name]["status"] = "CANCELLED"      async def tunnel_test_procs(self) -> typing.List[asyncio.Task]:          """run all the tunnel tests in the background as separate tasks""" @@ -203,7 +268,7 @@ class TunnelTop:          return tasks -    async def sighup_handler_async_worker(self, data_cols_new): +    async def sighup_handler_async_worker(self, data_cols_new) -> None:          """Handles the actual updating of tasks when we get SIGTERM"""          for k, value in data_cols_new.items():              if k not in self.data_cols: @@ -238,80 +303,133 @@ class TunnelTop:                          task.cancel()                  del self.data_cols[k] -    async def sighup_handler(self): +    async def sighup_handler(self) -> None:          """SIGHUP handler. we want to reload the config."""          # type: ignore # pylint: disable=E0203          data_cols_new: typing.Dict[str, typing.Dict[str, str]] = {} -        with open(self.argparser.args.config, "rb") as conf_file: -            data = tomllib.load(conf_file) -            for key, value in data.items(): -                data_cols_new[key] = { -                    "name": key, -                    "address": value["address"], -                    "port": value["port"], -                    "command": value["command"], -                    "status": "UNKN", -                    "test_command": value["test_command"], -                    "test_command_result": value["test_command_result"], -                    "test_interval": value["test_interval"], -                    "test_timeout": value["test_timeout"], -                    "stdout": "", -                    "stderr": "", -                } +        data_cols_new = self.read_conf()          await self.sighup_handler_async_worker(data_cols_new) +    def write_log(self, log: str): +        """A simple logger""" +        with open( +            "/home/devi/devi/abbatoir/hole15/log", +            "w", +            encoding="utf-8", +        ) as logfile: +            logfile.write(log) + +    async def restart_task(self, line_content: str) -> None: +        """restart a task""" +        name: str = line_content[: line_content.find(" ")] +        was_cancelled: bool = False +        for task in self.tunnel_tasks: +            if task.get_name() == name: +                was_cancelled = task.cancel() +                self.write_log(f"was_cancelled: {was_cancelled}") +                await task +        for _, value in self.data_cols.items(): +            if value["name"] == name: +                self.tunnel_tasks.append( +                    asyncio.create_task( +                        self.run_subshell(value["command"]), +                        name=value["name"], +                    ) +                ) +                await asyncio.sleep(0) + +    async def flip_task(self, line_content: str) -> None: +        """flip a task""" +        name: str = line_content[: line_content.find(" ")] +        was_cancelled: bool = False +        was_active: bool = False +        for task in self.tunnel_tasks: +            if task.get_name() == name: +                was_cancelled = task.cancel() +                await asyncio.sleep(0) +                self.write_log(f"was_cancelled: {was_cancelled}") +                await task +                was_active = True +                break + +        if not was_active: +            for _, value in self.data_cols.items(): +                if value["name"] == name: +                    self.tunnel_tasks.append( +                        asyncio.create_task( +                            self.run_subshell(value["command"]), +                            name=value["name"], +                        ) +                    ) +                    await asyncio.sleep(0) +                    break + +    async def quit(self) -> None: +        """Cleanly quit the applicaiton""" +        for tunnel_test_task in self.tunnel_test_tasks: +            tunnel_test_task.cancel() +        for tunnel_task in self.tunnel_tasks: +            tunnel_task.cancel() +      async def main(self) -> None:          """entrypoint""" -        print(Colors.screen_clear, end="") -        print(Colors.hide_cursor, end="") +        sel: int = 0 +        try: +            stdscr = curses_init() -        with open( -            os.path.expanduser(self.argparser.args.config), "rb" -        ) as conf_file: -            data = tomllib.load(conf_file) -            for key, value in data.items(): -                self.data_cols[key] = { -                    "name": key, -                    "address": value["address"], -                    "port": value["port"], -                    "command": value["command"], -                    "status": "UNKN", -                    "test_command": value["test_command"], -                    "test_command_result": value["test_command_result"], -                    "test_interval": value["test_interval"], -                    "test_timeout": value["test_timeout"], -                    "stdout": "", -                    "stderr": "", -                } +            self.data_cols = self.read_conf() -        loop = asyncio.get_event_loop() -        loop.add_signal_handler( -            signal.SIGHUP, -            lambda: asyncio.create_task(self.sighup_handler()), -        ) -        self.tunnel_tasks = await self.tunnel_procs() - -        while True: -            self.tunnel_test_tasks = await self.tunnel_test_procs() -            lines = ffs( -                2, -                ["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"] -                if not self.argparser.args.noheader -                else None, -                False, -                [v["name"] for _, v in self.data_cols.items()], -                [v["address"] for _, v in self.data_cols.items()], -                [repr(v["port"]) for _, v in self.data_cols.items()], -                [v["status"] for _, v in self.data_cols.items()], -                [v["stdout"] for _, v in self.data_cols.items()], -                [v["stderr"] for _, v in self.data_cols.items()], +            loop = asyncio.get_event_loop() +            loop.add_signal_handler( +                signal.SIGHUP, +                lambda: asyncio.create_task(self.sighup_handler()),              ) -            for line in lines: -                print(line) - -            await asyncio.sleep(self.argparser.args.delay) -            print(Colors.screen_clear, end="") -            print(Colors.hide_cursor, end="") +            self.tunnel_tasks = await self.tunnel_procs() + +            while True: +                # self.tunnel_test_tasks = await self.tunnel_test_procs() +                lines = ffs( +                    2, +                    ["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"] +                    if not self.argparser.args.noheader +                    else None, +                    False, +                    True, +                    [v["name"] for _, v in self.data_cols.items()], +                    [v["address"] for _, v in self.data_cols.items()], +                    [repr(v["port"]) for _, v in self.data_cols.items()], +                    [v["status"] for _, v in self.data_cols.items()], +                    [v["stdout"] for _, v in self.data_cols.items()], +                    [v["stderr"] for _, v in self.data_cols.items()], +                ) +                stdscr.clear() +                render(lines, stdscr, sel) +                char = stdscr.getch() + +                if char == ord("j") or char == curses.KEY_DOWN: +                    sel = (sel + 1) % len(self.data_cols) +                elif char == ord("k") or char == curses.KEY_UP: +                    sel = (sel - 1) % len(self.data_cols) +                elif char == ord("r"): +                    line_content = stdscr.instr(sel + 2, 1) +                    await self.restart_task(line_content.decode("utf-8")) +                elif char == ord("q"): +                    await self.quit() +                # elif char == curses.KEY_ENTER: +                elif char == ord("s"): +                    line_content = stdscr.instr(sel + 2, 1) +                    await self.flip_task(line_content.decode("utf-8")) + +                stdscr.refresh() +                await asyncio.sleep(0) +        finally: +            curses.nocbreak() +            stdscr.keypad(False) +            curses.echo() +            curses.endwin() +            tasks = asyncio.all_tasks() +            for task in tasks: +                task.cancel()  if __name__ == "__main__": | 
