diff options
Diffstat (limited to '')
| -rwxr-xr-x | bin/tunneltop | 231 | 
1 files changed, 155 insertions, 76 deletions
| diff --git a/bin/tunneltop b/bin/tunneltop index ebed5d9..1c65d0d 100755 --- a/bin/tunneltop +++ b/bin/tunneltop @@ -3,7 +3,9 @@  import argparse  import asyncio +import copy  import enum +import signal  import sys  import typing @@ -128,103 +130,180 @@ def ffs(      return lines -async def run_subshell(cmd: str) -> typing.Tuple[bytes, bytes]: -    """Run a command in a subshell""" -    proc = await asyncio.create_subprocess_shell( -        cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE -    ) +class TunnelTop: +    """The tunnel top class""" -    # return stdout and stderr -    return await proc.communicate() +    def __init__(self): +        self.argparser = Argparser() +        self.data_cols: typing.Dict[str, typing.Dict[str, str]] = {} +        self.tunnel_tasks: typing.List[asyncio.Task] = [] +        self.tunnel_test_tasks: typing.List[asyncio.Task] = [] + +    async def run_subshell(self, cmd: str) -> typing.Tuple[bytes, bytes]: +        """Run a command in a subshell""" +        proc = await asyncio.create_subprocess_shell( +            cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE +        ) +        return await proc.communicate() -def tunnel_test_callback(task: asyncio.Task) -> None: -    """Tunnel test callback function.""" -    task_name = task.get_name() -    # data_cols["stdout"] = task.result()[0] -    # data_cols["stderr"] = task.result()[1] -    if ( -        task.result()[0].decode("utf-8").strip("\n") -        == data_cols[task_name]["test_command_result"] -    ): -        data_cols[task_name]["status"] = "UP" -    else: -        data_cols[task_name]["status"] = "DOWN" +    def tunnel_test_callback(self, task: asyncio.Task) -> None: +        """Tunnel test callback function.""" +        try: +            task_name = task.get_name() +            self.data_cols[task_name]["stdout"] = ( +                task.result()[0].decode("utf-8").strip("\n") +            ) +            self.data_cols[task_name]["stderr"] = ( +                task.result()[1].decode("utf-8").strip("\n") +            ) +            if ( +                task.result()[0].decode("utf-8").strip("\n") +                == self.data_cols[task_name]["test_command_result"] +            ): +                self.data_cols[task_name]["status"] = "UP" +            else: +                self.data_cols[task_name]["status"] = "DOWN" +        except asyncio.TimeoutError: +            self.data_cols[task_name]["status"] = "TMOUT" + +    async def tunnel_test_procs(self) -> typing.List[asyncio.Task]: +        """run all the tunnel tests in the background as separate tasks""" +        tasks: typing.List[asyncio.Task] = [] +        for _, value in self.data_cols.items(): +            if value["test_command"] != "": +                tasks.append( +                    asyncio.create_task( +                        asyncio.wait_for( +                            self.run_subshell(value["test_command"]), +                            timeout=float(value["test_timeout"]), +                        ), +                        name=value["name"], +                    ) +                ) +                tasks[-1].add_done_callback(self.tunnel_test_callback) +                await asyncio.sleep(0) +        return tasks -async def tunnel_test_procs() -> typing.List[asyncio.Task]: -    """run all the tunnel tests in the background as separate tasks""" -    tasks: typing.List[asyncio.Task] = [] -    for _, value in data_cols.items(): -        if value["test_command"] != "": +    async def tunnel_procs( +        self, +    ) -> typing.List[asyncio.Task]: +        """run all the tunnels in the background as separate tasks""" +        tasks: typing.List[asyncio.Task] = [] +        for _, value in self.data_cols.items():              tasks.append(                  asyncio.create_task( -                    run_subshell(value["test_command"]), name=value["name"] -                ) +                    self.run_subshell(value["command"]), name=value["name"] +                ),              ) -            tasks[-1].add_done_callback(tunnel_test_callback)              await asyncio.sleep(0) -    return tasks - - -async def tunnel_procs(commands: typing.List[str]) -> None: -    """run all the tunnels in the background as separate tasks""" -    for command in commands: -        asyncio.create_task(run_subshell(command)) -        await asyncio.sleep(0) - - -data_cols: typing.Dict[str, typing.Dict] = {} - - -async def main() -> None: -    """entrypoint""" -    argparser = Argparser() -    print(Colors.screen_clear, end="") -    print(Colors.hide_cursor, end="") - -    with open(argparser.args.config, "rb") as conf_file: -        data = tomllib.load(conf_file) -        for key, value in data.items(): -            data_cols[key] = { -                "name": key, -                "address": value["address"], -                "port": value["port"], -                "command": value["command"], -                "status": "UNKN", -                "test_command": value["test_command"], -                "test_command_result": value["test_command_result"], -                "test_interval": value["test_interval"], -                "test_timeout": value["test_timeout"], -                "stdout": "", -                "stderr": "", -            } - -        await tunnel_procs([v["command"] for _, v in data_cols.items()]) +        return tasks + +    # pylint: disable=unused-argument +    async def sighup_handler(self, signum, frame): +        """SIGHUP handler. we want to reload the config.""" +        # type: ignore # pylint: disable=E0203 +        data_cols_new: typing.Dict[str, typing.Dict[str, str]] = {} +        with open(self.argparser.args.config, "rb") as conf_file: +            data = tomllib.load(conf_file) +            for key, value in data.items(): +                data_cols_new[key] = { +                    "name": key, +                    "address": value["address"], +                    "port": value["port"], +                    "command": value["command"], +                    "status": "UNKN", +                    "test_command": value["test_command"], +                    "test_command_result": value["test_command_result"], +                    "test_interval": value["test_interval"], +                    "test_timeout": value["test_timeout"], +                    "stdout": "", +                    "stderr": "", +                } + +        for k, value in data_cols_new.items(): +            if k not in self.data_cols: +                self.tunnel_tasks.append( +                    asyncio.create_task( +                        self.run_subshell(value["command"]), name=k +                    ) +                ) +                await asyncio.sleep(0) +            else: +                if ( +                    self.data_cols[k]["command"] != data_cols_new[k]["command"] +                    or self.data_cols[k]["port"] != data_cols_new[k]["port"] +                    or self.data_cols[k]["address"] +                    != data_cols_new[k]["address"] +                ): +                    for task in self.tunnel_tasks: +                        if task.get_name() == k: +                            task.cancel() +                        self.data_cols[k] = copy.deepcopy(data_cols_new[k]) +                        self.tunnel_tasks.append( +                            asyncio.create_task( +                                self.run_subshell(value["command"]), name=k +                            ) +                        ) +                        await asyncio.sleep(0) + +        for k, _ in self.data_cols.items(): +            if k not in data_cols_new: +                for task in self.tunnel_tasks: +                    if task.get_name() == k: +                        task.cancel() +                del self.data_cols[k] + +    async def main(self) -> None: +        """entrypoint""" +        signal.signal(signal.SIGHUP, self.sighup_handler) +        print(Colors.screen_clear, end="") +        print(Colors.hide_cursor, end="") + +        with open(self.argparser.args.config, "rb") as conf_file: +            data = tomllib.load(conf_file) +            for key, value in data.items(): +                self.data_cols[key] = { +                    "name": key, +                    "address": value["address"], +                    "port": value["port"], +                    "command": value["command"], +                    "status": "UNKN", +                    "test_command": value["test_command"], +                    "test_command_result": value["test_command_result"], +                    "test_interval": value["test_interval"], +                    "test_timeout": value["test_timeout"], +                    "stdout": "", +                    "stderr": "", +                } + +        self.tunnel_tasks = await self.tunnel_procs()          while True: -            await tunnel_test_procs() +            self.tunnel_test_tasks = await self.tunnel_test_procs()              lines = ffs(                  2,                  ["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"] -                if not argparser.args.noheader +                if not self.argparser.args.noheader                  else None,                  False, -                [v["name"] for _, v in data_cols.items()], -                [v["address"] for _, v in data_cols.items()], -                [repr(v["port"]) for _, v in data_cols.items()], -                [v["status"] for _, v in data_cols.items()], -                [v["stdout"] for _, v in data_cols.items()], -                [v["stderr"] for _, v in data_cols.items()], +                [v["name"] for _, v in self.data_cols.items()], +                [v["address"] for _, v in self.data_cols.items()], +                [repr(v["port"]) for _, v in self.data_cols.items()], +                [v["status"] for _, v in self.data_cols.items()], +                [v["stdout"] for _, v in self.data_cols.items()], +                [v["stderr"] for _, v in self.data_cols.items()],              )              for line in lines:                  print(line) -            await asyncio.sleep(argparser.args.delay) -            print(Colors.screen_clear, end="") -            print(Colors.hide_cursor, end="") +            await asyncio.sleep(self.argparser.args.delay) +            # print(Colors.screen_clear, end="") +            # print(Colors.hide_cursor, end="")  if __name__ == "__main__": -    asyncio.run(main()) +    tunnel_top = TunnelTop() +    asyncio.run(tunnel_top.main()) | 
