From 20c06eac1c7089551397710b142cf44fdaa30a3f Mon Sep 17 00:00:00 2001 From: terminaldweller Date: Tue, 7 Feb 2023 12:17:50 +0330 Subject: fix a bug with restart where it would kill but not restart a tunnel --- pyproject.toml | 2 +- tunneltop/tunneltop.py | 92 ++++++++++++++++++++++++++++---------------------- 2 files changed, 52 insertions(+), 42 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2724253..b243c4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tunneltop" -version = "0.1.3" +version = "0.1.4" description = "A top-like tunnel manager" authors = ["terminaldwelelr "] license = "GPL-3.0" diff --git a/tunneltop/tunneltop.py b/tunneltop/tunneltop.py index 21c6339..0e92578 100755 --- a/tunneltop/tunneltop.py +++ b/tunneltop/tunneltop.py @@ -210,7 +210,7 @@ class TunnelManager: str, typing.Dict[str, str] ] = self.read_conf() self.tunnel_tasks: typing.List[asyncio.Task] = [] - self.tunnel_test_tasks: typing.List[asyncio.Task] = [] + self.tunnel_test_tasks: typing.Dict[str, asyncio.Task] = {} self.scheduler_task: asyncio.Task self.scheduler_table: typing.Dict[ str, int @@ -237,6 +237,9 @@ class TunnelManager: """Remove the reference""" delete_index: int = -1 delete_task.cancel() + self.data_cols[delete_task.get_name()]["status"] = "UNKWN" + self.data_cols[delete_task.get_name()]["stdout"] = "n/a" + self.data_cols[delete_task.get_name()]["stderr"] = "n/a" self.write_log(f"{delete_task.get_name()} is being cancelled\n") await asyncio.sleep(0) for i, task in enumerate(task_list): @@ -260,13 +263,13 @@ class TunnelManager: "address": value["address"], "port": value["port"], "command": value["command"], - "status": "UNKN", + "status": "UNKWN", "test_command": value["test_command"], "test_command_result": value["test_command_result"], "test_interval": value["test_interval"], "test_timeout": value["test_timeout"], - "stdout": "", - "stderr": "", + "stdout": "n/a", + "stderr": "n/a", } return data_cols @@ -287,12 +290,15 @@ class TunnelManager: proc.terminate() raise - async def run_test_coro( - self, cmd: str, task_name: str - ) -> typing.Tuple[bytes, bytes]: + async def run_test_coro(self, cmd: str, task_name: str) -> None: """Run a test command""" try: - stdout, stderr = await self.run_subprocess(cmd) + async with asyncio.timeout_at( + asyncio.get_event_loop().time() + + float(self.data_cols[task_name]["test_timeout"]) + ): + self.write_log("running test for " + task_name + "\n") + stdout, stderr = await self.run_subprocess(cmd) stdout_str: str = stdout.decode("utf-8").strip("\n").strip('"') stderr_str: str = stderr.decode("utf-8").strip("\n").strip('"') @@ -303,11 +309,15 @@ class TunnelManager: else: self.data_cols[task_name]["status"] = "DOWN" - return stdout, stderr + del self.tunnel_test_tasks[task_name] + # return stdout, stderr except asyncio.TimeoutError: + self.write_log(f"test for {task_name} timed out\n") self.data_cols[task_name]["status"] = "TMOUT" self.data_cols[task_name]["stdout"] = "-" self.data_cols[task_name]["stderr"] = "-" + self.tunnel_test_tasks[task_name].cancel() + del self.tunnel_test_tasks[task_name] raise async def tunnel_procs( @@ -399,15 +409,15 @@ class TunnelManager: for task in self.tunnel_tasks: if task.get_name() == name: await self.stop_task(task, self.tunnel_tasks) - for _, value in self.data_cols.items(): - if value["name"] == name and task.cancelled(): - self.tunnel_tasks.append( - asyncio.create_task( - self.run_subprocess(value["command"]), - name=value["name"], - ) - ) - await asyncio.sleep(0) + tunnel_entry = self.data_cols[name] + self.tunnel_tasks.append( + asyncio.create_task( + self.run_subprocess(tunnel_entry["command"]), + name=tunnel_entry["name"], + ) + ) + await asyncio.sleep(0) + break async def flip_task(self, line_content: str) -> None: """flip a task""" @@ -420,15 +430,14 @@ class TunnelManager: break if not was_active: - for _, value in self.data_cols.items(): - if value["name"] == name: - self.tunnel_tasks.append( - asyncio.create_task( - self.run_subprocess(value["command"]), - name=value["name"], - ) - ) - await asyncio.sleep(0) + tunnel_entry = self.data_cols[name] + self.tunnel_tasks.append( + asyncio.create_task( + self.run_subprocess(tunnel_entry["command"]), + name=tunnel_entry["name"], + ) + ) + await asyncio.sleep(0) async def quit(self) -> None: """Cleanly quit the applicaiton""" @@ -452,33 +461,34 @@ class TunnelManager: return for key, value in self.scheduler_table.items(): if value == 0 and key not in self.tunnel_test_tasks: + self.write_log("rescheduling test for " + key + "\n") tunnel_entry = self.data_cols[key] test_task = asyncio.create_task( - asyncio.wait_for( - self.run_test_coro( - tunnel_entry["test_command"], - tunnel_entry["name"], - ), - timeout=float(tunnel_entry["test_timeout"]), + self.run_test_coro( + tunnel_entry["test_command"], + tunnel_entry["name"], ), name=key, ) - self.tunnel_test_tasks.append(test_task) - self.scheduler_table[key] = int( - tunnel_entry["test_interval"] - ) await asyncio.sleep(0) - self.scheduler_table[key] = int( - self.data_cols[key]["test_interval"] - ) - else: + self.tunnel_test_tasks[key] = test_task + if value > 0: self.scheduler_table[key] = ( self.scheduler_table[key] - 1 ) + if value <= 0: + self.write_log("revitalizing test for " + key + "\n") + self.scheduler_table[key] = int( + self.data_cols[key]["test_interval"] + ) # we are using a 1 second ticker. basically the scheduler # runs every second instead of as fast as it can await asyncio.sleep(1) + for test_task_name in self.tunnel_test_tasks.keys(): + self.write_log(test_task_name + " ") + self.write_log(repr(self.scheduler_table)) + self.write_log("\n") except asyncio.CancelledError: pass -- cgit v1.2.3