aboutsummaryrefslogtreecommitdiffstats
path: root/bin
diff options
context:
space:
mode:
Diffstat (limited to 'bin')
-rwxr-xr-xbin/tunneltop554
1 files changed, 0 insertions, 554 deletions
diff --git a/bin/tunneltop b/bin/tunneltop
deleted file mode 100755
index df3510c..0000000
--- a/bin/tunneltop
+++ /dev/null
@@ -1,554 +0,0 @@
-#!/usr/bin/env python
-"""A top-like program for monitoring ssh tunnels"""
-# TODO- task cancellation is very slow as should be with tasks
-import argparse
-import asyncio
-import copy
-import curses
-import enum
-import os
-import signal
-import sys
-import tomllib
-import typing
-
-
-class Argparser: # pylint: disable=too-few-public-methods
- """Argparser class."""
-
- def __init__(self):
- self.parser = argparse.ArgumentParser()
- self.parser.add_argument(
- "--config",
- "-c",
- type=str,
- help="The path to the .tunneltop.toml file,"
- " defaults to $HOME/.tunneltop.toml",
- default="~/.tunneltop.toml",
- )
- self.parser.add_argument(
- "--noheader",
- "-n",
- type=bool,
- help="Dont print the header in the output",
- default=False,
- )
- self.parser.add_argument(
- "--delay",
- "-d",
- type=float,
- help="The delay between redraws in seconds, defaults to 5 seconds",
- default=5,
- )
- self.args = self.parser.parse_args()
-
-
-# pylint: disable=too-few-public-methods
-class Colors(enum.EnumType):
- """static color definitions"""
-
- purple = "\033[95m"
- blue = "\033[94m"
- green = "\033[92m"
- yellow = "\033[93m"
- red = "\033[91m"
- grey = "\033[1;37m"
- darkgrey = "\033[1;30m"
- cyan = "\033[1;36m"
- ENDC = "\033[0m"
- BOLD = "\033[1m"
- UNDERLINE = "\033[4m"
- blueblue = "\x1b[38;5;24m"
- greenie = "\x1b[38;5;23m"
- goo = "\x1b[38;5;22m"
- screen_clear = "\033c\033[3J"
- hide_cursor = "\033[?25l"
-
-
-# pylint: disable=too-many-locals
-def ffs(
- offset: int,
- header_list: typing.Optional[typing.List[str]],
- numbered: bool,
- no_color: bool,
- *args,
-) -> typing.List[str]:
- """A simple columnar printer"""
- max_column_width = []
- lines = []
- numbers_f: typing.List[int] = []
- dummy = []
-
- if no_color or not sys.stdout.isatty():
- greenie = ""
- bold = ""
- endc = ""
- goo = ""
- blueblue = ""
- else:
- greenie = Colors.greenie
- bold = Colors.BOLD
- endc = Colors.ENDC
- goo = Colors.goo
- blueblue = Colors.blueblue
-
- for arg in args:
- max_column_width.append(max(len(repr(argette)) for argette in arg))
-
- if header_list is not None:
- if numbered:
- numbers_f.extend(range(1, len(args[-1]) + 1))
- max_column_width.append(
- max(len(repr(number)) for number in numbers_f)
- )
- header_list.insert(0, "idx")
-
- index = range(0, len(header_list))
- for header, width, i in zip(header_list, max_column_width, index):
- max_column_width[i] = max(len(header), width) + offset
-
- for i in index:
- dummy.append(
- greenie
- + bold
- + header_list[i].ljust(max_column_width[i])
- + endc
- )
- lines.append("".join(dummy))
- dummy.clear()
-
- index2 = range(0, len(args[-1]))
- for i in index2:
- if numbered:
- dummy.append(
- goo + bold + repr(i).ljust(max_column_width[0]) + endc
- )
- for arg, width in zip(args, max_column_width[1:]):
- dummy.append(blueblue + (arg[i]).ljust(width) + endc)
- else:
- for arg, width in zip(args, max_column_width):
- dummy.append(blueblue + (arg[i]).ljust(width) + endc)
- lines.append("".join(dummy))
- dummy.clear()
- return lines
-
-
-def render(
- data_cols: typing.Dict[str, typing.Dict[str, str]],
- tasks: typing.List[asyncio.Task],
- stdscr,
- sel: int,
-):
- """Render the text"""
- lines = ffs(
- 2,
- ["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"],
- False,
- True,
- [v["name"] for _, v in data_cols.items()],
- [v["address"] for _, v in data_cols.items()],
- [repr(v["port"]) for _, v in data_cols.items()],
- [v["status"] for _, v in data_cols.items()],
- [v["stdout"] for _, v in data_cols.items()],
- [v["stderr"] for _, v in data_cols.items()],
- )
- iterator = iter(lines)
- stdscr.addstr(1, 1, lines[0], curses.color_pair(3))
- next(iterator)
- for i, line in enumerate(iterator):
- try:
- line_content = stdscr.instr(sel + 2, 1).decode("utf-8")
- name: str = line_content[: line_content.find(" ")]
- finally:
- name = ""
- if i == sel:
- stdscr.addstr(
- (2 + i) % (len(lines) + 1),
- 1,
- line,
- curses.color_pair(2)
- if name not in tasks
- else curses.color_pair(5),
- )
- else:
- stdscr.addstr(
- 2 + i,
- 1,
- line,
- curses.color_pair(1)
- if name not in tasks
- else curses.color_pair(4),
- )
- stdscr.addstr("\n")
- stdscr.box()
-
-
-def curses_init():
- """Initialize ncurses"""
- stdscr = curses.initscr()
- curses.start_color()
- curses.use_default_colors()
- curses.curs_set(False)
- curses.noecho()
- curses.cbreak()
- stdscr.keypad(True)
- curses.halfdelay(20)
- curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK)
- curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_GREEN)
- curses.init_pair(3, curses.COLOR_BLUE, curses.COLOR_BLACK)
- curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK)
- curses.init_pair(5, curses.COLOR_BLACK, curses.COLOR_CYAN)
-
- return stdscr
-
-
-class TunnelManager:
- """The tunnel top class"""
-
- def __init__(self):
- self.argparser = Argparser()
- self.data_cols: typing.Dict[
- str, typing.Dict[str, str]
- ] = self.read_conf()
- self.tunnel_tasks: typing.List[asyncio.Task] = []
- self.tunnel_test_tasks: typing.List[asyncio.Task] = []
- self.scheduler_task: asyncio.Task
- self.scheduler_table: typing.Dict[
- str, int
- ] = self.init_scheduler_table()
- # we use this when its time to quit. this will prevent any
- # new tasks from being scheduled
- self.are_we_dying: bool = False
-
- def init_scheduler_table(self) -> typing.Dict[str, int]:
- """initialize the scheduler table"""
- result: typing.Dict[str, int] = {}
- for key, value in self.data_cols.items():
- if "test_interval" in value and value["test_command"] != "":
- result[key] = 0
-
- return result
-
- async def stop_task(
- self,
- delete_task: asyncio.Task,
- task_list: typing.List[asyncio.Task],
- delete: bool = True,
- ):
- """Remove the reference"""
- delete_index: int = -1
- delete_task.cancel()
- self.write_log(f"{delete_task.get_name()} is being cancelled\n")
- await asyncio.sleep(0)
- for i, task in enumerate(task_list):
- if task.get_name() == delete_task.get_name():
- delete_index = i
- break
-
- if delete and delete_index >= 0:
- task_list.remove(self.tunnel_tasks[delete_index])
-
- def read_conf(self) -> typing.Dict[str, typing.Dict[str, str]]:
- """Read the config file"""
- data_cols: typing.Dict[str, typing.Dict[str, str]] = {}
- with open(
- os.path.expanduser(self.argparser.args.config), "rb"
- ) as conf_file:
- data = tomllib.load(conf_file)
- for key, value in data.items():
- data_cols[key] = {
- "name": key,
- "address": value["address"],
- "port": value["port"],
- "command": value["command"],
- "status": "UNKN",
- "test_command": value["test_command"],
- "test_command_result": value["test_command_result"],
- "test_interval": value["test_interval"],
- "test_timeout": value["test_timeout"],
- "stdout": "",
- "stderr": "",
- }
- return data_cols
-
- async def run_subprocess(self, cmd: str) -> typing.Tuple[bytes, bytes]:
- """Run a command"""
- try:
- proc = await asyncio.create_subprocess_exec(
- *cmd.split(" "),
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
-
- return await proc.communicate()
- except asyncio.TimeoutError:
- proc.terminate()
- return (bytes(), bytes())
- except asyncio.CancelledError:
- proc.terminate()
- raise
-
- async def run_test_coro(
- self, cmd: str, task_name: str
- ) -> typing.Tuple[bytes, bytes]:
- """Run a test command"""
- try:
- stdout, stderr = await self.run_subprocess(cmd)
- stdout_str: str = stdout.decode("utf-8").strip("\n").strip('"')
- stderr_str: str = stderr.decode("utf-8").strip("\n").strip('"')
-
- self.data_cols[task_name]["stdout"] = stdout_str
- self.data_cols[task_name]["stderr"] = stderr_str
- if stdout_str == self.data_cols[task_name]["test_command_result"]:
- self.data_cols[task_name]["status"] = "UP"
- else:
- self.data_cols[task_name]["status"] = "DOWN"
-
- return stdout, stderr
- except asyncio.TimeoutError:
- self.data_cols[task_name]["status"] = "TMOUT"
- raise
-
- async def tunnel_procs(
- self,
- ) -> typing.List[asyncio.Task]:
- """run all the tunnels in the background as separate tasks"""
- tasks: typing.List[asyncio.Task] = []
- for _, value in self.data_cols.items():
- tasks.append(
- asyncio.create_task(
- self.run_subprocess(value["command"]), name=value["name"]
- ),
- )
- await asyncio.sleep(0)
-
- return tasks
-
- async def sighup_handler_async_worker(self, data_cols_new) -> None:
- """Handles the actual updating of tasks when we get SIGTERM"""
- delete_task: typing.Optional[asyncio.Task] = None
- for k, value in data_cols_new.items():
- if k not in self.data_cols:
- self.tunnel_tasks.append(
- asyncio.create_task(
- self.run_subprocess(value["command"]), name=k
- )
- )
- await asyncio.sleep(0)
- self.data_cols[k] = copy.deepcopy(value)
- if k in self.scheduler_table:
- self.scheduler_table[k] = 0
- else:
- if (
- self.data_cols[k]["command"] != data_cols_new[k]["command"]
- or self.data_cols[k]["port"] != data_cols_new[k]["port"]
- or self.data_cols[k]["address"]
- != data_cols_new[k]["address"]
- ):
- for task in self.tunnel_tasks:
- if task.get_name() == k:
- delete_task = task
- break
- # task.cancel()
- # await asyncio.sleep(0)
-
- if delete_task is not None:
- await self.stop_task(delete_task, self.tunnel_tasks)
- delete_task = None
- self.data_cols[k] = copy.deepcopy(data_cols_new[k])
- self.tunnel_tasks.append(
- asyncio.create_task(
- self.run_subprocess(value["command"]), name=k
- )
- )
- if k in self.scheduler_table:
- self.scheduler_table[k] = 0
- await asyncio.sleep(0)
-
- for k, _ in self.data_cols.items():
- if k not in data_cols_new:
- for task in self.tunnel_tasks:
- if task.get_name() == k:
- # task.cancel()
- # await asyncio.sleep(0)
- delete_task = task
- break
- if delete_task is not None:
- await self.stop_task(delete_task, self.tunnel_tasks)
- delete_task = None
- del self.data_cols[k]
- if k in self.scheduler_table:
- del self.scheduler_table[k]
-
- async def sighup_handler(self) -> None:
- """SIGHUP handler. we want to reload the config."""
- # type: ignore # pylint: disable=E0203
- data_cols_new: typing.Dict[str, typing.Dict[str, str]] = {}
- data_cols_new = self.read_conf()
- await self.sighup_handler_async_worker(data_cols_new)
-
- def write_log(self, log: str):
- """A simple logger"""
- with open(
- "/home/devi/devi/abbatoir/hole15/log",
- "a",
- encoding="utf-8",
- ) as logfile:
- logfile.write(log)
-
- async def restart_task(self, line_content: str) -> None:
- """restart a task"""
- name: str = line_content[: line_content.find(" ")]
- # was_cancelled: bool = False
- for task in self.tunnel_tasks:
- if task.get_name() == name:
- # was_cancelled = task.cancel()
- # self.write_log(f"was_cancelled: {was_cancelled}")
- await self.stop_task(task, self.tunnel_tasks)
- # await task
- # await asyncio.sleep(0)
- for _, value in self.data_cols.items():
- if value["name"] == name and task.cancelled():
- self.tunnel_tasks.append(
- asyncio.create_task(
- self.run_subprocess(value["command"]),
- name=value["name"],
- )
- )
- await asyncio.sleep(0)
-
- async def flip_task(self, line_content: str) -> None:
- """flip a task"""
- name: str = line_content[: line_content.find(" ")]
- # was_cancelled: bool = False
- was_active: bool = False
- for task in self.tunnel_tasks:
- if task.get_name() == name:
- await self.stop_task(task, self.tunnel_tasks)
- # was_cancelled = task.cancel()
- # await asyncio.sleep(0)
- # self.write_log(f"was_cancelled: {was_cancelled}")
- # await task
- was_active = True
- break
-
- if not was_active:
- for _, value in self.data_cols.items():
- if value["name"] == name:
- self.tunnel_tasks.append(
- asyncio.create_task(
- self.run_subprocess(value["command"]),
- name=value["name"],
- )
- )
- await asyncio.sleep(0)
-
- async def quit(self) -> None:
- """Cleanly quit the applicaiton"""
- # scheduler checks for this so stop making new tasks
- # when we want to quit
- self.are_we_dying = True
-
- for task in asyncio.all_tasks():
- task.cancel()
- await asyncio.sleep(0)
- try:
- await asyncio.gather(*asyncio.all_tasks())
- finally:
- sys.exit(0)
-
- async def scheduler(self) -> None:
- """schedulaer manages running the tests and reviving dead tunnels"""
- try:
- while True:
- if self.are_we_dying:
- return
- for key, value in self.scheduler_table.items():
- if value == 0 and key not in self.tunnel_test_tasks:
- tunnel_entry = self.data_cols[key]
- test_task = asyncio.create_task(
- asyncio.wait_for(
- self.run_test_coro(
- tunnel_entry["test_command"],
- tunnel_entry["name"],
- ),
- timeout=float(tunnel_entry["test_timeout"]),
- ),
- name=key,
- )
- self.tunnel_test_tasks.append(test_task)
- self.scheduler_table[key] = int(
- tunnel_entry["test_interval"]
- )
- await asyncio.sleep(0)
- else:
- self.scheduler_table[key] = (
- self.scheduler_table[key] - 1
- )
-
- # we are using a 1 second ticker. basically the scheduler
- # runs every second instead of as fast as it can
- await asyncio.sleep(1)
- except asyncio.CancelledError:
- pass
-
- async def tui_loop(self) -> None:
- """the tui loop"""
- sel: int = 0
- try:
- stdscr = curses_init()
- # we spawn the tunnels and the test scheduler put them
- # in the background and then run the TUI loop
- self.tunnel_tasks = await self.tunnel_procs()
- self.scheduler_task = asyncio.create_task(
- self.scheduler(), name="scheduler"
- )
- loop = asyncio.get_event_loop()
- loop.add_signal_handler(
- signal.SIGHUP,
- lambda: asyncio.create_task(self.sighup_handler()),
- )
-
- while True:
- stdscr.clear()
- render(self.data_cols, self.tunnel_tasks, stdscr, sel)
- char = stdscr.getch()
-
- if char == ord("j") or char == curses.KEY_DOWN:
- sel = (sel + 1) % len(self.data_cols)
- elif char == ord("k") or char == curses.KEY_UP:
- sel = (sel - 1) % len(self.data_cols)
- elif char == ord("g") or char == curses.KEY_UP:
- sel = 0
- elif char == ord("G") or char == curses.KEY_UP:
- sel = len(self.data_cols) - 1
- elif char == ord("r"):
- line_content = stdscr.instr(sel + 2, 1)
- await self.restart_task(line_content.decode("utf-8"))
- elif char == ord("q"):
- await self.quit()
- elif char == ord("s"):
- line_content = stdscr.instr(sel + 2, 1)
- await self.flip_task(line_content.decode("utf-8"))
-
- for task in self.tunnel_tasks:
- self.write_log(
- f"{task.get_name()} is {task.cancelled()} or {task.cancelling()}\n"
- )
-
- stdscr.refresh()
- await asyncio.sleep(0)
- finally:
- curses.nocbreak()
- stdscr.keypad(False)
- curses.echo()
- curses.endwin()
- # tasks = asyncio.all_tasks()
- # for task in tasks:
- # task.cancel()
- await self.quit()
-
-
-if __name__ == "__main__":
- tunnel_manager = TunnelManager()
- asyncio.run(tunnel_manager.tui_loop())