#!/usr/bin/env python
"""A top-like program for monitoring ssh tunnels or any tunnels"""
# TODO- the disabled coloring is not working
# TODO- quit doesnt work
# TODO- we are reviving dead tunnels
import argparse
import asyncio
import copy
import curses
import enum
import os
import signal
import sys
import tomllib
import typing
class Argparser: # pylint: disable=too-few-public-methods
"""Argparser class."""
def __init__(self):
self.parser = argparse.ArgumentParser()
self.parser.add_argument(
"--config",
"-c",
type=str,
help="The path to the .tunneltop.toml file,"
" defaults to $HOME/.tunneltop.toml",
default="~/.tunneltop.toml",
)
self.parser.add_argument(
"--noheader",
"-n",
type=bool,
help="Dont print the header in the output",
default=False,
)
self.parser.add_argument(
"--delay",
"-d",
type=float,
help="The delay between redraws in seconds, defaults to 5 seconds",
default=5,
)
self.args = self.parser.parse_args()
# pylint: disable=too-few-public-methods
class Colors(enum.EnumType):
"""static color definitions"""
purple = "\033[95m"
blue = "\033[94m"
green = "\033[92m"
yellow = "\033[93m"
red = "\033[91m"
grey = "\033[1;37m"
darkgrey = "\033[1;30m"
cyan = "\033[1;36m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
blueblue = "\x1b[38;5;24m"
greenie = "\x1b[38;5;23m"
goo = "\x1b[38;5;22m"
screen_clear = "\033c\033[3J"
hide_cursor = "\033[?25l"
# pylint: disable=too-many-locals
def ffs(
offset: int,
header_list: typing.Optional[typing.List[str]],
numbered: bool,
no_color: bool,
*args,
) -> typing.List[str]:
"""A simple columnar printer"""
max_column_width = []
lines = []
numbers_f: typing.List[int] = []
dummy = []
if no_color or not sys.stdout.isatty():
greenie = ""
bold = ""
endc = ""
goo = ""
blueblue = ""
else:
greenie = Colors.greenie
bold = Colors.BOLD
endc = Colors.ENDC
goo = Colors.goo
blueblue = Colors.blueblue
for arg in args:
max_column_width.append(max(len(repr(argette)) for argette in arg))
if header_list is not None:
if numbered:
numbers_f.extend(range(1, len(args[-1]) + 1))
max_column_width.append(
max(len(repr(number)) for number in numbers_f)
)
header_list.insert(0, "idx")
index = range(0, len(header_list))
for header, width, i in zip(header_list, max_column_width, index):
max_column_width[i] = max(len(header), width) + offset
for i in index:
dummy.append(
greenie
+ bold
+ header_list[i].ljust(max_column_width[i])
+ endc
)
lines.append("".join(dummy))
dummy.clear()
index2 = range(0, len(args[-1]))
for i in index2:
if numbered:
dummy.append(
goo + bold + repr(i).ljust(max_column_width[0]) + endc
)
for arg, width in zip(args, max_column_width[1:]):
dummy.append(blueblue + (arg[i]).ljust(width) + endc)
else:
for arg, width in zip(args, max_column_width):
dummy.append(blueblue + (arg[i]).ljust(width) + endc)
lines.append("".join(dummy))
dummy.clear()
return lines
def render(
data_cols: typing.Dict[str, typing.Dict[str, str]],
tasks: typing.List[asyncio.Task],
stdscr,
sel: int,
):
"""Render the text"""
lines = ffs(
2,
["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"],
False,
True,
[v["name"] for _, v in data_cols.items()],
[v["address"] for _, v in data_cols.items()],
[repr(v["port"]) for _, v in data_cols.items()],
[v["status"] for _, v in data_cols.items()],
[v["stdout"] for _, v in data_cols.items()],
[v["stderr"] for _, v in data_cols.items()],
)
iterator = iter(lines)
stdscr.addstr(1, 1, lines[0], curses.color_pair(3))
next(iterator)
for i, line in enumerate(iterator):
try:
line_content = stdscr.instr(sel + 2, 1).decode("utf-8")
name: str = line_content[: line_content.find(" ")]
finally:
name = ""
if i == sel:
stdscr.addstr(
(2 + i) % (len(lines) + 1),
1,
line,
curses.color_pair(2)
if name not in tasks
else curses.color_pair(5),
)
else:
stdscr.addstr(
2 + i,
1,
line,
curses.color_pair(1)
if name not in tasks
else curses.color_pair(4),
)
stdscr.addstr("\n")
stdscr.box()
def curses_init():
"""Initialize ncurses"""
stdscr = curses.initscr()
curses.start_color()
curses.use_default_colors()
curses.curs_set(False)
curses.noecho()
curses.cbreak()
stdscr.keypad(True)
curses.halfdelay(20)
curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_GREEN)
curses.init_pair(3, curses.COLOR_BLUE, curses.COLOR_BLACK)
curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(5, curses.COLOR_BLACK, curses.COLOR_CYAN)
return stdscr
class TunnelManager:
"""The tunnel top class"""
def __init__(self):
self.argparser = Argparser()
self.data_cols: typing.Dict[
str, typing.Dict[str, str]
] = self.read_conf()
self.tunnel_tasks: typing.List[asyncio.Task] = []
self.tunnel_test_tasks: typing.List[asyncio.Task] = []
self.scheduler_task: asyncio.Task
self.scheduler_table: typing.Dict[
str, int
] = self.init_scheduler_table()
# we use this when its time to quit. this will prevent any
# new tasks from being scheduled
self.are_we_dying: bool = False
def init_scheduler_table(self) -> typing.Dict[str, int]:
"""initialize the scheduler table"""
result: typing.Dict[str, int] = {}
for key, value in self.data_cols.items():
if "test_interval" in value and value["test_command"] != "":
result[key] = 0
return result
async def stop_task(
self,
delete_task: asyncio.Task,
task_list: typing.List[asyncio.Task],
delete: bool = True,
):
"""Remove the reference"""
delete_index: int = -1
delete_task.cancel()
self.write_log(f"{delete_task.get_name()} is being cancelled\n")
await asyncio.sleep(0)
for i, task in enumerate(task_list):
if task.get_name() == delete_task.get_name():
delete_index = i
break
if delete and delete_index >= 0:
task_list.remove(self.tunnel_tasks[delete_index])
def read_conf(self) -> typing.Dict[str, typing.Dict[str, str]]:
"""Read the config file"""
data_cols: typing.Dict[str, typing.Dict[str, str]] = {}
with open(
os.path.expanduser(self.argparser.args.config), "rb"
) as conf_file:
data = tomllib.load(conf_file)
for key, value in data.items():
data_cols[key] = {
"name": key,
"address": value["address"],
"port": value["port"],
"command": value["command"],
"status": "UNKN",
"test_command": value["test_command"],
"test_command_result": value["test_command_result"],
"test_interval": value["test_interval"],
"test_timeout": value["test_timeout"],
"stdout": "",
"stderr": "",
}
return data_cols
async def run_subprocess(self, cmd: str) -> typing.Tuple[bytes, bytes]:
"""Run a command"""
try:
proc = await asyncio.create_subprocess_exec(
*cmd.split(" "),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
return await proc.communicate()
except asyncio.TimeoutError:
proc.terminate()
raise
except asyncio.CancelledError:
proc.terminate()
raise
async def run_test_coro(
self, cmd: str, task_name: str
) -> typing.Tuple[bytes, bytes]:
"""Run a test command"""
try:
stdout, stderr = await self.run_subprocess(cmd)
stdout_str: str = stdout.decode("utf-8").strip("\n").strip('"')
stderr_str: str = stderr.decode("utf-8").strip("\n").strip('"')
self.data_cols[task_name]["stdout"] = stdout_str
self.data_cols[task_name]["stderr"] = stderr_str
if stdout_str == self.data_cols[task_name]["test_command_result"]:
self.data_cols[task_name]["status"] = "UP"
else:
self.data_cols[task_name]["status"] = "DOWN"
return stdout, stderr
except asyncio.TimeoutError:
self.data_cols[task_name]["status"] = "TMOUT"
self.data_cols[task_name]["stdout"] = "-"
self.data_cols[task_name]["stderr"] = "-"
raise
async def tunnel_procs(
self,
) -> typing.List[asyncio.Task]:
"""run all the tunnels in the background as separate subprocesses"""
tasks: typing.List[asyncio.Task] = []
for _, value in self.data_cols.items():
tasks.append(
asyncio.create_task(
self.run_subprocess(value["command"]), name=value["name"]
),
)
await asyncio.sleep(0)
return tasks
async def sighup_handler_async_worker(self, data_cols_new) -> None:
"""Handles the actual updating of tasks when we get SIGTERM"""
delete_task: typing.Optional[asyncio.Task] = None
for k, value in data_cols_new.items():
if k not in self.data_cols:
self.tunnel_tasks.append(
asyncio.create_task(
self.run_subprocess(value["command"]), name=k
)
)
await asyncio.sleep(0)
self.data_cols[k] = copy.deepcopy(value)
if k in self.scheduler_table:
self.scheduler_table[k] = 0
else:
if (
self.data_cols[k]["command"] != data_cols_new[k]["command"]
or self.data_cols[k]["port"] != data_cols_new[k]["port"]
or self.data_cols[k]["address"]
!= data_cols_new[k]["address"]
):
for task in self.tunnel_tasks:
if task.get_name() == k:
delete_task = task
break
# task.cancel()
# await asyncio.sleep(0)
if delete_task is not None:
await self.stop_task(delete_task, self.tunnel_tasks)
delete_task = None
self.data_cols[k] = copy.deepcopy(data_cols_new[k])
self.tunnel_tasks.append(
asyncio.create_task(
self.run_subprocess(value["command"]), name=k
)
)
if k in self.scheduler_table:
self.scheduler_table[k] = 0
await asyncio.sleep(0)
for k, _ in self.data_cols.items():
if k not in data_cols_new:
for task in self.tunnel_tasks:
if task.get_name() == k:
# task.cancel()
# await asyncio.sleep(0)
delete_task = task
break
if delete_task is not None:
await self.stop_task(delete_task, self.tunnel_tasks)
delete_task = None
del self.data_cols[k]
if k in self.scheduler_table:
del self.scheduler_table[k]
async def sighup_handler(self) -> None:
"""SIGHUP handler. we want to reload the config."""
# type: ignore # pylint: disable=E0203
data_cols_new: typing.Dict[str, typing.Dict[str, str]] = {}
data_cols_new = self.read_conf()
await self.sighup_handler_async_worker(data_cols_new)
def write_log(self, log: str):
"""A simple logger"""
with open(
"/home/devi/devi/abbatoir/hole15/log",
"a",
encoding="utf-8",
) as logfile:
logfile.write(log)
async def restart_task(self, line_content: str) -> None:
"""restart a task"""
name: str = line_content[: line_content.find(" ")]
# was_cancelled: bool = False
for task in self.tunnel_tasks:
if task.get_name() == name:
# was_cancelled = task.cancel()
# self.write_log(f"was_cancelled: {was_cancelled}")
await self.stop_task(task, self.tunnel_tasks)
# await task
# await asyncio.sleep(0)
for _, value in self.data_cols.items():
if value["name"] == name and task.cancelled():
self.tunnel_tasks.append(
asyncio.create_task(
self.run_subprocess(value["command"]),
name=value["name"],
)
)
await asyncio.sleep(0)
async def flip_task(self, line_content: str) -> None:
"""flip a task"""
name: str = line_content[: line_content.find(" ")]
# was_cancelled: bool = False
was_active: bool = False
for task in self.tunnel_tasks:
if task.get_name() == name:
await self.stop_task(task, self.tunnel_tasks)
# was_cancelled = task.cancel()
# await asyncio.sleep(0)
# self.write_log(f"was_cancelled: {was_cancelled}")
# await task
was_active = True
break
if not was_active:
for _, value in self.data_cols.items():
if value["name"] == name:
self.tunnel_tasks.append(
asyncio.create_task(
self.run_subprocess(value["command"]),
name=value["name"],
)
)
await asyncio.sleep(0)
async def quit(self) -> None:
"""Cleanly quit the applicaiton"""
# scheduler checks for this to stop running new tests
# when we want to quit
self.are_we_dying = True
for task in asyncio.all_tasks():
task.cancel()
await asyncio.sleep(0)
try:
await asyncio.gather(*asyncio.all_tasks())
finally:
sys.exit(0)
async def scheduler(self) -> None:
"""scheduler manages running the tests and reviving dead tunnels"""
try:
while True:
if self.are_we_dying:
return
for key, value in self.scheduler_table.items():
if value == 0 and key not in self.tunnel_test_tasks:
tunnel_entry = self.data_cols[key]
test_task = asyncio.create_task(
asyncio.wait_for(
self.run_test_coro(
tunnel_entry["test_command"],
tunnel_entry["name"],
),
timeout=float(tunnel_entry["test_timeout"]),
),
name=key,
)
self.tunnel_test_tasks.append(test_task)
self.scheduler_table[key] = int(
tunnel_entry["test_interval"]
)
await asyncio.sleep(0)
else:
self.scheduler_table[key] = (
self.scheduler_table[key] - 1
)
# we are using a 1 second ticker. basically the scheduler
# runs every second instead of as fast as it can
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
async def tui_loop(self) -> None:
"""the tui loop"""
sel: int = 0
try:
stdscr = curses_init()
# we spawn the tunnels and the test scheduler put them
# in the background and then run the TUI loop
self.tunnel_tasks = await self.tunnel_procs()
self.scheduler_task = asyncio.create_task(
self.scheduler(), name="scheduler"
)
loop = asyncio.get_event_loop()
loop.add_signal_handler(
signal.SIGHUP,
lambda: asyncio.create_task(self.sighup_handler()),
)
while True:
stdscr.clear()
render(self.data_cols, self.tunnel_tasks, stdscr, sel)
char = stdscr.getch()
if char == ord("j") or char == curses.KEY_DOWN:
sel = (sel + 1) % len(self.data_cols)
elif char == ord("k") or char == curses.KEY_UP:
sel = (sel - 1) % len(self.data_cols)
elif char == ord("g") or char == curses.KEY_UP:
sel = 0
elif char == ord("G") or char == curses.KEY_UP:
sel = len(self.data_cols) - 1
elif char == ord("r"):
line_content = stdscr.instr(sel + 2, 1)
await self.restart_task(line_content.decode("utf-8"))
elif char == ord("q"):
await self.quit()
elif char == ord("s"):
line_content = stdscr.instr(sel + 2, 1)
await self.flip_task(line_content.decode("utf-8"))
stdscr.refresh()
await asyncio.sleep(0)
finally:
curses.nocbreak()
stdscr.keypad(False)
curses.echo()
curses.endwin()
# tasks = asyncio.all_tasks()
# for task in tasks:
# task.cancel()
await self.quit()
def main() -> None:
"""entry point"""
tunnel_manager = TunnelManager()
asyncio.run(tunnel_manager.tui_loop())
if __name__ == "__main__":
main()