aboutsummaryrefslogtreecommitdiffstats
path: root/bin/tunneltop
diff options
context:
space:
mode:
Diffstat (limited to 'bin/tunneltop')
-rwxr-xr-xbin/tunneltop231
1 files changed, 155 insertions, 76 deletions
diff --git a/bin/tunneltop b/bin/tunneltop
index ebed5d9..1c65d0d 100755
--- a/bin/tunneltop
+++ b/bin/tunneltop
@@ -3,7 +3,9 @@
import argparse
import asyncio
+import copy
import enum
+import signal
import sys
import typing
@@ -128,103 +130,180 @@ def ffs(
return lines
-async def run_subshell(cmd: str) -> typing.Tuple[bytes, bytes]:
- """Run a command in a subshell"""
- proc = await asyncio.create_subprocess_shell(
- cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
- )
+class TunnelTop:
+ """The tunnel top class"""
- # return stdout and stderr
- return await proc.communicate()
+ def __init__(self):
+ self.argparser = Argparser()
+ self.data_cols: typing.Dict[str, typing.Dict[str, str]] = {}
+ self.tunnel_tasks: typing.List[asyncio.Task] = []
+ self.tunnel_test_tasks: typing.List[asyncio.Task] = []
+
+ async def run_subshell(self, cmd: str) -> typing.Tuple[bytes, bytes]:
+ """Run a command in a subshell"""
+ proc = await asyncio.create_subprocess_shell(
+ cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
+ )
+ return await proc.communicate()
-def tunnel_test_callback(task: asyncio.Task) -> None:
- """Tunnel test callback function."""
- task_name = task.get_name()
- # data_cols["stdout"] = task.result()[0]
- # data_cols["stderr"] = task.result()[1]
- if (
- task.result()[0].decode("utf-8").strip("\n")
- == data_cols[task_name]["test_command_result"]
- ):
- data_cols[task_name]["status"] = "UP"
- else:
- data_cols[task_name]["status"] = "DOWN"
+ def tunnel_test_callback(self, task: asyncio.Task) -> None:
+ """Tunnel test callback function."""
+ try:
+ task_name = task.get_name()
+ self.data_cols[task_name]["stdout"] = (
+ task.result()[0].decode("utf-8").strip("\n")
+ )
+ self.data_cols[task_name]["stderr"] = (
+ task.result()[1].decode("utf-8").strip("\n")
+ )
+ if (
+ task.result()[0].decode("utf-8").strip("\n")
+ == self.data_cols[task_name]["test_command_result"]
+ ):
+ self.data_cols[task_name]["status"] = "UP"
+ else:
+ self.data_cols[task_name]["status"] = "DOWN"
+ except asyncio.TimeoutError:
+ self.data_cols[task_name]["status"] = "TMOUT"
+
+ async def tunnel_test_procs(self) -> typing.List[asyncio.Task]:
+ """run all the tunnel tests in the background as separate tasks"""
+ tasks: typing.List[asyncio.Task] = []
+ for _, value in self.data_cols.items():
+ if value["test_command"] != "":
+ tasks.append(
+ asyncio.create_task(
+ asyncio.wait_for(
+ self.run_subshell(value["test_command"]),
+ timeout=float(value["test_timeout"]),
+ ),
+ name=value["name"],
+ )
+ )
+ tasks[-1].add_done_callback(self.tunnel_test_callback)
+ await asyncio.sleep(0)
+ return tasks
-async def tunnel_test_procs() -> typing.List[asyncio.Task]:
- """run all the tunnel tests in the background as separate tasks"""
- tasks: typing.List[asyncio.Task] = []
- for _, value in data_cols.items():
- if value["test_command"] != "":
+ async def tunnel_procs(
+ self,
+ ) -> typing.List[asyncio.Task]:
+ """run all the tunnels in the background as separate tasks"""
+ tasks: typing.List[asyncio.Task] = []
+ for _, value in self.data_cols.items():
tasks.append(
asyncio.create_task(
- run_subshell(value["test_command"]), name=value["name"]
- )
+ self.run_subshell(value["command"]), name=value["name"]
+ ),
)
- tasks[-1].add_done_callback(tunnel_test_callback)
await asyncio.sleep(0)
- return tasks
-
-
-async def tunnel_procs(commands: typing.List[str]) -> None:
- """run all the tunnels in the background as separate tasks"""
- for command in commands:
- asyncio.create_task(run_subshell(command))
- await asyncio.sleep(0)
-
-
-data_cols: typing.Dict[str, typing.Dict] = {}
-
-
-async def main() -> None:
- """entrypoint"""
- argparser = Argparser()
- print(Colors.screen_clear, end="")
- print(Colors.hide_cursor, end="")
-
- with open(argparser.args.config, "rb") as conf_file:
- data = tomllib.load(conf_file)
- for key, value in data.items():
- data_cols[key] = {
- "name": key,
- "address": value["address"],
- "port": value["port"],
- "command": value["command"],
- "status": "UNKN",
- "test_command": value["test_command"],
- "test_command_result": value["test_command_result"],
- "test_interval": value["test_interval"],
- "test_timeout": value["test_timeout"],
- "stdout": "",
- "stderr": "",
- }
-
- await tunnel_procs([v["command"] for _, v in data_cols.items()])
+ return tasks
+
+ # pylint: disable=unused-argument
+ async def sighup_handler(self, signum, frame):
+ """SIGHUP handler. we want to reload the config."""
+ # type: ignore # pylint: disable=E0203
+ data_cols_new: typing.Dict[str, typing.Dict[str, str]] = {}
+ with open(self.argparser.args.config, "rb") as conf_file:
+ data = tomllib.load(conf_file)
+ for key, value in data.items():
+ data_cols_new[key] = {
+ "name": key,
+ "address": value["address"],
+ "port": value["port"],
+ "command": value["command"],
+ "status": "UNKN",
+ "test_command": value["test_command"],
+ "test_command_result": value["test_command_result"],
+ "test_interval": value["test_interval"],
+ "test_timeout": value["test_timeout"],
+ "stdout": "",
+ "stderr": "",
+ }
+
+ for k, value in data_cols_new.items():
+ if k not in self.data_cols:
+ self.tunnel_tasks.append(
+ asyncio.create_task(
+ self.run_subshell(value["command"]), name=k
+ )
+ )
+ await asyncio.sleep(0)
+ else:
+ if (
+ self.data_cols[k]["command"] != data_cols_new[k]["command"]
+ or self.data_cols[k]["port"] != data_cols_new[k]["port"]
+ or self.data_cols[k]["address"]
+ != data_cols_new[k]["address"]
+ ):
+ for task in self.tunnel_tasks:
+ if task.get_name() == k:
+ task.cancel()
+ self.data_cols[k] = copy.deepcopy(data_cols_new[k])
+ self.tunnel_tasks.append(
+ asyncio.create_task(
+ self.run_subshell(value["command"]), name=k
+ )
+ )
+ await asyncio.sleep(0)
+
+ for k, _ in self.data_cols.items():
+ if k not in data_cols_new:
+ for task in self.tunnel_tasks:
+ if task.get_name() == k:
+ task.cancel()
+ del self.data_cols[k]
+
+ async def main(self) -> None:
+ """entrypoint"""
+ signal.signal(signal.SIGHUP, self.sighup_handler)
+ print(Colors.screen_clear, end="")
+ print(Colors.hide_cursor, end="")
+
+ with open(self.argparser.args.config, "rb") as conf_file:
+ data = tomllib.load(conf_file)
+ for key, value in data.items():
+ self.data_cols[key] = {
+ "name": key,
+ "address": value["address"],
+ "port": value["port"],
+ "command": value["command"],
+ "status": "UNKN",
+ "test_command": value["test_command"],
+ "test_command_result": value["test_command_result"],
+ "test_interval": value["test_interval"],
+ "test_timeout": value["test_timeout"],
+ "stdout": "",
+ "stderr": "",
+ }
+
+ self.tunnel_tasks = await self.tunnel_procs()
while True:
- await tunnel_test_procs()
+ self.tunnel_test_tasks = await self.tunnel_test_procs()
lines = ffs(
2,
["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"]
- if not argparser.args.noheader
+ if not self.argparser.args.noheader
else None,
False,
- [v["name"] for _, v in data_cols.items()],
- [v["address"] for _, v in data_cols.items()],
- [repr(v["port"]) for _, v in data_cols.items()],
- [v["status"] for _, v in data_cols.items()],
- [v["stdout"] for _, v in data_cols.items()],
- [v["stderr"] for _, v in data_cols.items()],
+ [v["name"] for _, v in self.data_cols.items()],
+ [v["address"] for _, v in self.data_cols.items()],
+ [repr(v["port"]) for _, v in self.data_cols.items()],
+ [v["status"] for _, v in self.data_cols.items()],
+ [v["stdout"] for _, v in self.data_cols.items()],
+ [v["stderr"] for _, v in self.data_cols.items()],
)
for line in lines:
print(line)
- await asyncio.sleep(argparser.args.delay)
- print(Colors.screen_clear, end="")
- print(Colors.hide_cursor, end="")
+ await asyncio.sleep(self.argparser.args.delay)
+ # print(Colors.screen_clear, end="")
+ # print(Colors.hide_cursor, end="")
if __name__ == "__main__":
- asyncio.run(main())
+ tunnel_top = TunnelTop()
+ asyncio.run(tunnel_top.main())