aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pyproject.toml2
-rwxr-xr-xtunneltop/tunneltop.py92
2 files changed, 52 insertions, 42 deletions
diff --git a/pyproject.toml b/pyproject.toml
index 2724253..b243c4e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "tunneltop"
-version = "0.1.3"
+version = "0.1.4"
description = "A top-like tunnel manager"
authors = ["terminaldwelelr <devi@terminaldweller.com>"]
license = "GPL-3.0"
diff --git a/tunneltop/tunneltop.py b/tunneltop/tunneltop.py
index 21c6339..0e92578 100755
--- a/tunneltop/tunneltop.py
+++ b/tunneltop/tunneltop.py
@@ -210,7 +210,7 @@ class TunnelManager:
str, typing.Dict[str, str]
] = self.read_conf()
self.tunnel_tasks: typing.List[asyncio.Task] = []
- self.tunnel_test_tasks: typing.List[asyncio.Task] = []
+ self.tunnel_test_tasks: typing.Dict[str, asyncio.Task] = {}
self.scheduler_task: asyncio.Task
self.scheduler_table: typing.Dict[
str, int
@@ -237,6 +237,9 @@ class TunnelManager:
"""Remove the reference"""
delete_index: int = -1
delete_task.cancel()
+ self.data_cols[delete_task.get_name()]["status"] = "UNKWN"
+ self.data_cols[delete_task.get_name()]["stdout"] = "n/a"
+ self.data_cols[delete_task.get_name()]["stderr"] = "n/a"
self.write_log(f"{delete_task.get_name()} is being cancelled\n")
await asyncio.sleep(0)
for i, task in enumerate(task_list):
@@ -260,13 +263,13 @@ class TunnelManager:
"address": value["address"],
"port": value["port"],
"command": value["command"],
- "status": "UNKN",
+ "status": "UNKWN",
"test_command": value["test_command"],
"test_command_result": value["test_command_result"],
"test_interval": value["test_interval"],
"test_timeout": value["test_timeout"],
- "stdout": "",
- "stderr": "",
+ "stdout": "n/a",
+ "stderr": "n/a",
}
return data_cols
@@ -287,12 +290,15 @@ class TunnelManager:
proc.terminate()
raise
- async def run_test_coro(
- self, cmd: str, task_name: str
- ) -> typing.Tuple[bytes, bytes]:
+ async def run_test_coro(self, cmd: str, task_name: str) -> None:
"""Run a test command"""
try:
- stdout, stderr = await self.run_subprocess(cmd)
+ async with asyncio.timeout_at(
+ asyncio.get_event_loop().time()
+ + float(self.data_cols[task_name]["test_timeout"])
+ ):
+ self.write_log("running test for " + task_name + "\n")
+ stdout, stderr = await self.run_subprocess(cmd)
stdout_str: str = stdout.decode("utf-8").strip("\n").strip('"')
stderr_str: str = stderr.decode("utf-8").strip("\n").strip('"')
@@ -303,11 +309,15 @@ class TunnelManager:
else:
self.data_cols[task_name]["status"] = "DOWN"
- return stdout, stderr
+ del self.tunnel_test_tasks[task_name]
+ # return stdout, stderr
except asyncio.TimeoutError:
+ self.write_log(f"test for {task_name} timed out\n")
self.data_cols[task_name]["status"] = "TMOUT"
self.data_cols[task_name]["stdout"] = "-"
self.data_cols[task_name]["stderr"] = "-"
+ self.tunnel_test_tasks[task_name].cancel()
+ del self.tunnel_test_tasks[task_name]
raise
async def tunnel_procs(
@@ -399,15 +409,15 @@ class TunnelManager:
for task in self.tunnel_tasks:
if task.get_name() == name:
await self.stop_task(task, self.tunnel_tasks)
- for _, value in self.data_cols.items():
- if value["name"] == name and task.cancelled():
- self.tunnel_tasks.append(
- asyncio.create_task(
- self.run_subprocess(value["command"]),
- name=value["name"],
- )
- )
- await asyncio.sleep(0)
+ tunnel_entry = self.data_cols[name]
+ self.tunnel_tasks.append(
+ asyncio.create_task(
+ self.run_subprocess(tunnel_entry["command"]),
+ name=tunnel_entry["name"],
+ )
+ )
+ await asyncio.sleep(0)
+ break
async def flip_task(self, line_content: str) -> None:
"""flip a task"""
@@ -420,15 +430,14 @@ class TunnelManager:
break
if not was_active:
- for _, value in self.data_cols.items():
- if value["name"] == name:
- self.tunnel_tasks.append(
- asyncio.create_task(
- self.run_subprocess(value["command"]),
- name=value["name"],
- )
- )
- await asyncio.sleep(0)
+ tunnel_entry = self.data_cols[name]
+ self.tunnel_tasks.append(
+ asyncio.create_task(
+ self.run_subprocess(tunnel_entry["command"]),
+ name=tunnel_entry["name"],
+ )
+ )
+ await asyncio.sleep(0)
async def quit(self) -> None:
"""Cleanly quit the applicaiton"""
@@ -452,33 +461,34 @@ class TunnelManager:
return
for key, value in self.scheduler_table.items():
if value == 0 and key not in self.tunnel_test_tasks:
+ self.write_log("rescheduling test for " + key + "\n")
tunnel_entry = self.data_cols[key]
test_task = asyncio.create_task(
- asyncio.wait_for(
- self.run_test_coro(
- tunnel_entry["test_command"],
- tunnel_entry["name"],
- ),
- timeout=float(tunnel_entry["test_timeout"]),
+ self.run_test_coro(
+ tunnel_entry["test_command"],
+ tunnel_entry["name"],
),
name=key,
)
- self.tunnel_test_tasks.append(test_task)
- self.scheduler_table[key] = int(
- tunnel_entry["test_interval"]
- )
await asyncio.sleep(0)
- self.scheduler_table[key] = int(
- self.data_cols[key]["test_interval"]
- )
- else:
+ self.tunnel_test_tasks[key] = test_task
+ if value > 0:
self.scheduler_table[key] = (
self.scheduler_table[key] - 1
)
+ if value <= 0:
+ self.write_log("revitalizing test for " + key + "\n")
+ self.scheduler_table[key] = int(
+ self.data_cols[key]["test_interval"]
+ )
# we are using a 1 second ticker. basically the scheduler
# runs every second instead of as fast as it can
await asyncio.sleep(1)
+ for test_task_name in self.tunnel_test_tasks.keys():
+ self.write_log(test_task_name + " ")
+ self.write_log(repr(self.scheduler_table))
+ self.write_log("\n")
except asyncio.CancelledError:
pass