#!/usr/bin/env python
"""A top-like program for monitoring ssh tunnels"""
# TODO- task cancellation is very slow as should be with tasks
import argparse
import asyncio
import copy
import curses
import enum
import os
import signal
import sys
import tomllib
import typing
class Argparser: # pylint: disable=too-few-public-methods
"""Argparser class."""
def __init__(self):
self.parser = argparse.ArgumentParser()
self.parser.add_argument(
"--config",
"-c",
type=str,
help="The path to the .tunneltop.toml file,"
" defaults to $HOME/.tunneltop.toml",
default="~/.tunneltop.toml",
)
self.parser.add_argument(
"--noheader",
"-n",
type=bool,
help="Dont print the header in the output",
default=False,
)
self.parser.add_argument(
"--delay",
"-d",
type=float,
help="The delay between redraws in seconds, defaults to 5 seconds",
default=5,
)
self.args = self.parser.parse_args()
# pylint: disable=too-few-public-methods
class Colors(enum.EnumType):
"""static color definitions"""
purple = "\033[95m"
blue = "\033[94m"
green = "\033[92m"
yellow = "\033[93m"
red = "\033[91m"
grey = "\033[1;37m"
darkgrey = "\033[1;30m"
cyan = "\033[1;36m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
blueblue = "\x1b[38;5;24m"
greenie = "\x1b[38;5;23m"
goo = "\x1b[38;5;22m"
screen_clear = "\033c\033[3J"
hide_cursor = "\033[?25l"
# pylint: disable=too-many-locals
def ffs(
offset: int,
header_list: typing.Optional[typing.List[str]],
numbered: bool,
no_color: bool,
*args,
) -> typing.List[str]:
"""A simple columnar printer"""
max_column_width = []
lines = []
numbers_f: typing.List[int] = []
dummy = []
if no_color or not sys.stdout.isatty():
greenie = ""
bold = ""
endc = ""
goo = ""
blueblue = ""
else:
greenie = Colors.greenie
bold = Colors.BOLD
endc = Colors.ENDC
goo = Colors.goo
blueblue = Colors.blueblue
for arg in args:
max_column_width.append(max(len(repr(argette)) for argette in arg))
if header_list is not None:
if numbered:
numbers_f.extend(range(1, len(args[-1]) + 1))
max_column_width.append(
max(len(repr(number)) for number in numbers_f)
)
header_list.insert(0, "idx")
index = range(0, len(header_list))
for header, width, i in zip(header_list, max_column_width, index):
max_column_width[i] = max(len(header), width) + offset
for i in index:
dummy.append(
greenie
+ bold
+ header_list[i].ljust(max_column_width[i])
+ endc
)
lines.append("".join(dummy))
dummy.clear()
index2 = range(0, len(args[-1]))
for i in index2:
if numbered:
dummy.append(
goo + bold + repr(i).ljust(max_column_width[0]) + endc
)
for arg, width in zip(args, max_column_width[1:]):
dummy.append(blueblue + (arg[i]).ljust(width) + endc)
else:
for arg, width in zip(args, max_column_width):
dummy.append(blueblue + (arg[i]).ljust(width) + endc)
lines.append("".join(dummy))
dummy.clear()
return lines
def render(
data_cols: typing.Dict[str, typing.Dict[str, str]],
tasks: typing.List[asyncio.Task],
stdscr,
sel: int,
):
"""Render the text"""
lines = ffs(
2,
["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"],
False,
True,
[v["name"] for _, v in data_cols.items()],
[v["address"] for _, v in data_cols.items()],
[repr(v["port"]) for _, v in data_cols.items()],
[v["status"] for _, v in data_cols.items()],
[v["stdout"] for _, v in data_cols.items()],
[v["stderr"] for _, v in data_cols.items()],
)
iterator = iter(lines)
stdscr.addstr(1, 1, lines[0], curses.color_pair(3))
next(iterator)
for i, line in enumerate(iterator):
try:
line_content = stdscr.instr(sel + 2, 1).decode("utf-8")
name: str = line_content[: line_content.find(" ")]
finally:
name = ""
if i == sel:
stdscr.addstr(
(2 + i) % (len(lines) + 1),
1,
line,
curses.color_pair(2)
if name not in tasks
else curses.color_pair(5),
)
else:
stdscr.addstr(
2 + i,
1,
line,
curses.color_pair(1)
if name not in tasks
else curses.color_pair(4),
)
stdscr.addstr("\n")
stdscr.box()
def curses_init():
"""Initialize ncurses"""
stdscr = curses.initscr()
curses.start_color()
curses.use_default_colors()
curses.curs_set(False)
curses.noecho()
curses.cbreak()
stdscr.keypad(True)
curses.halfdelay(20)
curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_GREEN)
curses.init_pair(3, curses.COLOR_BLUE, curses.COLOR_BLACK)
curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(5, curses.COLOR_BLACK, curses.COLOR_CYAN)
return stdscr
class TunnelManager:
"""The tunnel top class"""
def __init__(self):
self.argparser = Argparser()
self.data_cols: typing.Dict[
str, typing.Dict[str, str]
] = self.read_conf()
self.tunnel_tasks: typing.List[asyncio.Task] = []
self.tunnel_test_tasks: typing.List[asyncio.Task] = []
self.scheduler_task: asyncio.Task
self.scheduler_table: typing.Dict[
str, int
] = self.init_scheduler_table()
# we use this when its time to quit. this will prevent any
# new tasks from being scheduled
self.are_we_dying: bool = False
def init_scheduler_table(self) -> typing.Dict[str, int]:
"""initialize the scheduler table"""
result: typing.Dict[str, int] = {}
for key, value in self.data_cols.items():
if "test_interval" in value and value["test_command"] != "":
result[key] = 0
return result
async def stop_task(
self,
delete_task: asyncio.Task,
task_list: typing.List[asyncio.Task],
delete: bool = True,
):
"""Remove the reference"""
delete_index: int = -1
delete_task.cancel()
self.write_log(f"{delete_task.get_name()} is being cancelled\n")
await asyncio.sleep(0)
for i, task in enumerate(task_list):
if task.get_name() == delete_task.get_name():
delete_index = i
break
if delete and delete_index >= 0:
task_list.remove(self.tunnel_tasks[delete_index])
def read_conf(self) -> typing.Dict[str, typing.Dict[str, str]]:
"""Read the config file"""
data_cols: typing.Dict[str, typing.Dict[str, str]] = {}
with open(
os.path.expanduser(self.argparser.args.config), "rb"
) as conf_file:
data = tomllib.load(conf_file)
for key, value in data.items():
data_cols[key] = {
"name": key,
"address": value["address"],
"port": value["port"],
"command": value["command"],
"status": "UNKN",
"test_command": value["test_command"],
"test_command_result": value["test_command_result"],
"test_interval": value["test_interval"],
"test_timeout": value["test_timeout"],
"stdout": "",
"stderr": "",
}
return data_cols
async def run_subprocess(self, cmd: str) -> typing.Tuple[bytes, bytes]:
"""Run a command"""
try:
proc = await asyncio.create_subprocess_exec(
*cmd.split(" "),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
return await proc.communicate()
except asyncio.TimeoutError:
proc.terminate()
return (bytes(), bytes())
except asyncio.CancelledError:
proc.terminate()
raise
async def run_test_coro(
self, cmd: str, task_name: str
) -> typing.Tuple[bytes, bytes]:
"""Run a test command"""
try:
stdout, stderr = await self.run_subprocess(cmd)
stdout_str: str = stdout.decode("utf-8").strip("\n").strip('"')
stderr_str: str = stderr.decode("utf-8").strip("\n").strip('"')
self.data_cols[task_name]["stdout"] = stdout_str
self.data_cols[task_name]["stderr"] = stderr_str
if stdout_str == self.data_cols[task_name]["test_command_result"]:
self.data_cols[task_name]["status"] = "UP"
else:
self.data_cols[task_name]["status"] = "DOWN"
return stdout, stderr
except asyncio.TimeoutError:
self.data_cols[task_name]["status"] = "TMOUT"
raise
async def tunnel_procs(
self,
) -> typing.List[asyncio.Task]:
"""run all the tunnels in the background as separate tasks"""
tasks: typing.List[asyncio.Task] = []
for _, value in self.data_cols.items():
tasks.append(
asyncio.create_task(
self.run_subprocess(value["command"]), name=value["name"]
),
)
await asyncio.sleep(0)
return tasks
async def sighup_handler_async_worker(self, data_cols_new) -> None:
"""Handles the actual updating of tasks when we get SIGTERM"""
delete_task: typing.Optional[asyncio.Task] = None
for k, value in data_cols_new.items():
if k not in self.data_cols:
self.tunnel_tasks.append(
asyncio.create_task(
self.run_subprocess(value["command"]), name=k
)
)
await asyncio.sleep(0)
self.data_cols[k] = copy.deepcopy(value)
if k in self.scheduler_table:
self.scheduler_table[k] = 0
else:
if (
self.data_cols[k]["command"] != data_cols_new[k]["command"]
or self.data_cols[k]["port"] != data_cols_new[k]["port"]
or self.data_cols[k]["address"]
!= data_cols_new[k]["address"]
):
for task in self.tunnel_tasks:
if task.get_name() == k:
delete_task = task
break
# task.cancel()
# await asyncio.sleep(0)
if delete_task is not None:
await self.stop_task(delete_task, self.tunnel_tasks)
delete_task = None
self.data_cols[k] = copy.deepcopy(data_cols_new[k])
self.tunnel_tasks.append(
asyncio.create_task(
self.run_subprocess(value["command"]), name=k
)
)
if k in self.scheduler_table:
self.scheduler_table[k] = 0
await asyncio.sleep(0)
for k, _ in self.data_cols.items():
if k not in data_cols_new:
for task in self.tunnel_tasks:
if task.get_name() == k:
# task.cancel()
# await asyncio.sleep(0)
delete_task = task
break
if delete_task is not None:
await self.stop_task(delete_task, self.tunnel_tasks)
delete_task = None
del self.data_cols[k]
if k in self.scheduler_table:
del self.scheduler_table[k]
async def sighup_handler(self) -> None:
"""SIGHUP handler. we want to reload the config."""
# type: ignore # pylint: disable=E0203
data_cols_new: typing.Dict[str, typing.Dict[str, str]] = {}
data_cols_new = self.read_conf()
await self.sighup_handler_async_worker(data_cols_new)
def write_log(self, log: str):
"""A simple logger"""
with open(
"/home/devi/devi/abbatoir/hole15/log",
"a",
encoding="utf-8",
) as logfile:
logfile.write(log)
async def restart_task(self, line_content: str) -> None:
"""restart a task"""
name: str = line_content[: line_content.find(" ")]
# was_cancelled: bool = False
for task in self.tunnel_tasks:
if task.get_name() == name:
# was_cancelled = task.cancel()
# self.write_log(f"was_cancelled: {was_cancelled}")
await self.stop_task(task, self.tunnel_tasks)
# await task
# await asyncio.sleep(0)
for _, value in self.data_cols.items():
if value["name"] == name and task.cancelled():
self.tunnel_tasks.append(
asyncio.create_task(
self.run_subprocess(value["command"]),
name=value["name"],
)
)
await asyncio.sleep(0)
async def flip_task(self, line_content: str) -> None:
"""flip a task"""
name: str = line_content[: line_content.find(" ")]
# was_cancelled: bool = False
was_active: bool = False
for task in self.tunnel_tasks:
if task.get_name() == name:
await self.stop_task(task, self.tunnel_tasks)
# was_cancelled = task.cancel()
# await asyncio.sleep(0)
# self.write_log(f"was_cancelled: {was_cancelled}")
# await task
was_active = True
break
if not was_active:
for _, value in self.data_cols.items():
if value["name"] == name:
self.tunnel_tasks.append(
asyncio.create_task(
self.run_subprocess(value["command"]),
name=value["name"],
)
)
await asyncio.sleep(0)
async def quit(self) -> None:
"""Cleanly quit the applicaiton"""
# scheduler checks for this so stop making new tasks
# when we want to quit
self.are_we_dying = True
for task in asyncio.all_tasks():
task.cancel()
await asyncio.sleep(0)
try:
await asyncio.gather(*asyncio.all_tasks())
finally:
sys.exit(0)
async def scheduler(self) -> None:
"""schedulaer manages running the tests and reviving dead tunnels"""
try:
while True:
if self.are_we_dying:
return
for key, value in self.scheduler_table.items():
if value == 0 and key not in self.tunnel_test_tasks:
tunnel_entry = self.data_cols[key]
test_task = asyncio.create_task(
asyncio.wait_for(
self.run_test_coro(
tunnel_entry["test_command"],
tunnel_entry["name"],
),
timeout=float(tunnel_entry["test_timeout"]),
),
name=key,
)
self.tunnel_test_tasks.append(test_task)
self.scheduler_table[key] = int(
tunnel_entry["test_interval"]
)
await asyncio.sleep(0)
else:
self.scheduler_table[key] = (
self.scheduler_table[key] - 1
)
# we are using a 1 second ticker. basically the scheduler
# runs every second instead of as fast as it can
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
async def tui_loop(self) -> None:
"""the tui loop"""
sel: int = 0
try:
stdscr = curses_init()
# we spawn the tunnels and the test scheduler put them
# in the background and then run the TUI loop
self.tunnel_tasks = await self.tunnel_procs()
self.scheduler_task = asyncio.create_task(
self.scheduler(), name="scheduler"
)
loop = asyncio.get_event_loop()
loop.add_signal_handler(
signal.SIGHUP,
lambda: asyncio.create_task(self.sighup_handler()),
)
while True:
stdscr.clear()
render(self.data_cols, self.tunnel_tasks, stdscr, sel)
char = stdscr.getch()
if char == ord("j") or char == curses.KEY_DOWN:
sel = (sel + 1) % len(self.data_cols)
elif char == ord("k") or char == curses.KEY_UP:
sel = (sel - 1) % len(self.data_cols)
elif char == ord("g") or char == curses.KEY_UP:
sel = 0
elif char == ord("G") or char == curses.KEY_UP:
sel = len(self.data_cols) - 1
elif char == ord("r"):
line_content = stdscr.instr(sel + 2, 1)
await self.restart_task(line_content.decode("utf-8"))
elif char == ord("q"):
await self.quit()
elif char == ord("s"):
line_content = stdscr.instr(sel + 2, 1)
await self.flip_task(line_content.decode("utf-8"))
for task in self.tunnel_tasks:
self.write_log(
f"{task.get_name()} is {task.cancelled()} or {task.cancelling()}\n"
)
stdscr.refresh()
await asyncio.sleep(0)
finally:
curses.nocbreak()
stdscr.keypad(False)
curses.echo()
curses.endwin()
# tasks = asyncio.all_tasks()
# for task in tasks:
# task.cancel()
await self.quit()
if __name__ == "__main__":
tunnel_manager = TunnelManager()
asyncio.run(tunnel_manager.tui_loop())