diff options
author | terminaldweller <thabogre@gmail.com> | 2023-01-17 08:01:01 +0000 |
---|---|---|
committer | terminaldweller <thabogre@gmail.com> | 2023-01-17 08:01:01 +0000 |
commit | 31daa0e705af8bf4db8bfbf0984b32b293b8f97f (patch) | |
tree | 6e9f088eb1e5b68b945a733808c98c09670f90cb /bin/tunneltop | |
parent | update (diff) | |
download | scripts-31daa0e705af8bf4db8bfbf0984b32b293b8f97f.tar.gz scripts-31daa0e705af8bf4db8bfbf0984b32b293b8f97f.zip |
updates
Diffstat (limited to '')
-rwxr-xr-x | bin/tunneltop | 231 |
1 files changed, 155 insertions, 76 deletions
diff --git a/bin/tunneltop b/bin/tunneltop index ebed5d9..1c65d0d 100755 --- a/bin/tunneltop +++ b/bin/tunneltop @@ -3,7 +3,9 @@ import argparse import asyncio +import copy import enum +import signal import sys import typing @@ -128,103 +130,180 @@ def ffs( return lines -async def run_subshell(cmd: str) -> typing.Tuple[bytes, bytes]: - """Run a command in a subshell""" - proc = await asyncio.create_subprocess_shell( - cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) +class TunnelTop: + """The tunnel top class""" - # return stdout and stderr - return await proc.communicate() + def __init__(self): + self.argparser = Argparser() + self.data_cols: typing.Dict[str, typing.Dict[str, str]] = {} + self.tunnel_tasks: typing.List[asyncio.Task] = [] + self.tunnel_test_tasks: typing.List[asyncio.Task] = [] + + async def run_subshell(self, cmd: str) -> typing.Tuple[bytes, bytes]: + """Run a command in a subshell""" + proc = await asyncio.create_subprocess_shell( + cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + return await proc.communicate() -def tunnel_test_callback(task: asyncio.Task) -> None: - """Tunnel test callback function.""" - task_name = task.get_name() - # data_cols["stdout"] = task.result()[0] - # data_cols["stderr"] = task.result()[1] - if ( - task.result()[0].decode("utf-8").strip("\n") - == data_cols[task_name]["test_command_result"] - ): - data_cols[task_name]["status"] = "UP" - else: - data_cols[task_name]["status"] = "DOWN" + def tunnel_test_callback(self, task: asyncio.Task) -> None: + """Tunnel test callback function.""" + try: + task_name = task.get_name() + self.data_cols[task_name]["stdout"] = ( + task.result()[0].decode("utf-8").strip("\n") + ) + self.data_cols[task_name]["stderr"] = ( + task.result()[1].decode("utf-8").strip("\n") + ) + if ( + task.result()[0].decode("utf-8").strip("\n") + == self.data_cols[task_name]["test_command_result"] + ): + self.data_cols[task_name]["status"] = "UP" + else: + self.data_cols[task_name]["status"] = "DOWN" + except asyncio.TimeoutError: + self.data_cols[task_name]["status"] = "TMOUT" + + async def tunnel_test_procs(self) -> typing.List[asyncio.Task]: + """run all the tunnel tests in the background as separate tasks""" + tasks: typing.List[asyncio.Task] = [] + for _, value in self.data_cols.items(): + if value["test_command"] != "": + tasks.append( + asyncio.create_task( + asyncio.wait_for( + self.run_subshell(value["test_command"]), + timeout=float(value["test_timeout"]), + ), + name=value["name"], + ) + ) + tasks[-1].add_done_callback(self.tunnel_test_callback) + await asyncio.sleep(0) + return tasks -async def tunnel_test_procs() -> typing.List[asyncio.Task]: - """run all the tunnel tests in the background as separate tasks""" - tasks: typing.List[asyncio.Task] = [] - for _, value in data_cols.items(): - if value["test_command"] != "": + async def tunnel_procs( + self, + ) -> typing.List[asyncio.Task]: + """run all the tunnels in the background as separate tasks""" + tasks: typing.List[asyncio.Task] = [] + for _, value in self.data_cols.items(): tasks.append( asyncio.create_task( - run_subshell(value["test_command"]), name=value["name"] - ) + self.run_subshell(value["command"]), name=value["name"] + ), ) - tasks[-1].add_done_callback(tunnel_test_callback) await asyncio.sleep(0) - return tasks - - -async def tunnel_procs(commands: typing.List[str]) -> None: - """run all the tunnels in the background as separate tasks""" - for command in commands: - asyncio.create_task(run_subshell(command)) - await asyncio.sleep(0) - - -data_cols: typing.Dict[str, typing.Dict] = {} - - -async def main() -> None: - """entrypoint""" - argparser = Argparser() - print(Colors.screen_clear, end="") - print(Colors.hide_cursor, end="") - - with open(argparser.args.config, "rb") as conf_file: - data = tomllib.load(conf_file) - for key, value in data.items(): - data_cols[key] = { - "name": key, - "address": value["address"], - "port": value["port"], - "command": value["command"], - "status": "UNKN", - "test_command": value["test_command"], - "test_command_result": value["test_command_result"], - "test_interval": value["test_interval"], - "test_timeout": value["test_timeout"], - "stdout": "", - "stderr": "", - } - - await tunnel_procs([v["command"] for _, v in data_cols.items()]) + return tasks + + # pylint: disable=unused-argument + async def sighup_handler(self, signum, frame): + """SIGHUP handler. we want to reload the config.""" + # type: ignore # pylint: disable=E0203 + data_cols_new: typing.Dict[str, typing.Dict[str, str]] = {} + with open(self.argparser.args.config, "rb") as conf_file: + data = tomllib.load(conf_file) + for key, value in data.items(): + data_cols_new[key] = { + "name": key, + "address": value["address"], + "port": value["port"], + "command": value["command"], + "status": "UNKN", + "test_command": value["test_command"], + "test_command_result": value["test_command_result"], + "test_interval": value["test_interval"], + "test_timeout": value["test_timeout"], + "stdout": "", + "stderr": "", + } + + for k, value in data_cols_new.items(): + if k not in self.data_cols: + self.tunnel_tasks.append( + asyncio.create_task( + self.run_subshell(value["command"]), name=k + ) + ) + await asyncio.sleep(0) + else: + if ( + self.data_cols[k]["command"] != data_cols_new[k]["command"] + or self.data_cols[k]["port"] != data_cols_new[k]["port"] + or self.data_cols[k]["address"] + != data_cols_new[k]["address"] + ): + for task in self.tunnel_tasks: + if task.get_name() == k: + task.cancel() + self.data_cols[k] = copy.deepcopy(data_cols_new[k]) + self.tunnel_tasks.append( + asyncio.create_task( + self.run_subshell(value["command"]), name=k + ) + ) + await asyncio.sleep(0) + + for k, _ in self.data_cols.items(): + if k not in data_cols_new: + for task in self.tunnel_tasks: + if task.get_name() == k: + task.cancel() + del self.data_cols[k] + + async def main(self) -> None: + """entrypoint""" + signal.signal(signal.SIGHUP, self.sighup_handler) + print(Colors.screen_clear, end="") + print(Colors.hide_cursor, end="") + + with open(self.argparser.args.config, "rb") as conf_file: + data = tomllib.load(conf_file) + for key, value in data.items(): + self.data_cols[key] = { + "name": key, + "address": value["address"], + "port": value["port"], + "command": value["command"], + "status": "UNKN", + "test_command": value["test_command"], + "test_command_result": value["test_command_result"], + "test_interval": value["test_interval"], + "test_timeout": value["test_timeout"], + "stdout": "", + "stderr": "", + } + + self.tunnel_tasks = await self.tunnel_procs() while True: - await tunnel_test_procs() + self.tunnel_test_tasks = await self.tunnel_test_procs() lines = ffs( 2, ["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"] - if not argparser.args.noheader + if not self.argparser.args.noheader else None, False, - [v["name"] for _, v in data_cols.items()], - [v["address"] for _, v in data_cols.items()], - [repr(v["port"]) for _, v in data_cols.items()], - [v["status"] for _, v in data_cols.items()], - [v["stdout"] for _, v in data_cols.items()], - [v["stderr"] for _, v in data_cols.items()], + [v["name"] for _, v in self.data_cols.items()], + [v["address"] for _, v in self.data_cols.items()], + [repr(v["port"]) for _, v in self.data_cols.items()], + [v["status"] for _, v in self.data_cols.items()], + [v["stdout"] for _, v in self.data_cols.items()], + [v["stderr"] for _, v in self.data_cols.items()], ) for line in lines: print(line) - await asyncio.sleep(argparser.args.delay) - print(Colors.screen_clear, end="") - print(Colors.hide_cursor, end="") + await asyncio.sleep(self.argparser.args.delay) + # print(Colors.screen_clear, end="") + # print(Colors.hide_cursor, end="") if __name__ == "__main__": - asyncio.run(main()) + tunnel_top = TunnelTop() + asyncio.run(tunnel_top.main()) |