aboutsummaryrefslogblamecommitdiffstats
path: root/bin/tunneltop
blob: ebed5d9513d19f0152442ca5ec1c3d24dc14801f (plain) (tree)





































































































































































































































                                                                           
#!/usr/bin/env python
"""A top-like program for monitoring ssh tunnels"""

import argparse
import asyncio
import enum
import sys
import typing

import tomllib


class Argparser:  # pylint: disable=too-few-public-methods
    """Argparser class."""

    def __init__(self):
        self.parser = argparse.ArgumentParser()
        self.parser.add_argument(
            "--config",
            "-c",
            type=str,
            help="the path to the .tunneltop.toml file",
            default="/home/devi/.tunneltop.toml",
        )
        self.parser.add_argument(
            "--noheader",
            "-n",
            type=bool,
            help="dont print the header",
            default=False,
        )
        self.parser.add_argument(
            "--delay",
            "-d",
            type=float,
            help="The delay between updates in seconds",
            default=5,
        )
        self.args = self.parser.parse_args()


# pylint: disable=too-few-public-methods
class Colors(enum.EnumType):
    """static color definitions"""

    purple = "\033[95m"
    blue = "\033[94m"
    green = "\033[92m"
    yellow = "\033[93m"
    red = "\033[91m"
    grey = "\033[1;37m"
    darkgrey = "\033[1;30m"
    cyan = "\033[1;36m"
    ENDC = "\033[0m"
    BOLD = "\033[1m"
    UNDERLINE = "\033[4m"
    blueblue = "\x1b[38;5;24m"
    greenie = "\x1b[38;5;23m"
    goo = "\x1b[38;5;22m"
    screen_clear = "\033c\033[3J"
    hide_cursor = "\033[?25l"


# pylint: disable=too-many-locals
def ffs(
    offset: int,
    header_list: typing.Optional[typing.List[str]],
    numbered: bool,
    *args,
) -> typing.List[str]:
    """A simple columnar printer"""
    max_column_width = []
    lines = []
    numbers_f: typing.List[int] = []
    dummy = []

    if sys.stdout.isatty():
        greenie = Colors.greenie
        bold = Colors.BOLD
        endc = Colors.ENDC
        goo = Colors.goo
        blueblue = Colors.blueblue
    else:
        greenie = ""
        bold = ""
        endc = ""
        goo = ""
        blueblue = ""

    for arg in args:
        max_column_width.append(max(len(repr(argette)) for argette in arg))

    if header_list is not None:
        if numbered:
            numbers_f.extend(range(1, len(args[-1]) + 1))
            max_column_width.append(
                max(len(repr(number)) for number in numbers_f)
            )
            header_list.insert(0, "idx")

        index = range(0, len(header_list))
        for header, width, i in zip(header_list, max_column_width, index):
            max_column_width[i] = max(len(header), width) + offset

        for i in index:
            dummy.append(
                greenie
                + bold
                + header_list[i].ljust(max_column_width[i])
                + endc
            )
        lines.append("".join(dummy))
        dummy.clear()

    index2 = range(0, len(args[-1]))
    for i in index2:
        if numbered:
            dummy.append(
                goo + bold + repr(i).ljust(max_column_width[0]) + endc
            )
            for arg, width in zip(args, max_column_width[1:]):
                dummy.append(blueblue + (arg[i]).ljust(width) + endc)
        else:
            for arg, width in zip(args, max_column_width):
                dummy.append(blueblue + (arg[i]).ljust(width) + endc)
        lines.append("".join(dummy))
        dummy.clear()
    return lines


async def run_subshell(cmd: str) -> typing.Tuple[bytes, bytes]:
    """Run a command in a subshell"""
    proc = await asyncio.create_subprocess_shell(
        cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )

    # return stdout and stderr
    return await proc.communicate()


def tunnel_test_callback(task: asyncio.Task) -> None:
    """Tunnel test callback function."""
    task_name = task.get_name()
    # data_cols["stdout"] = task.result()[0]
    # data_cols["stderr"] = task.result()[1]
    if (
        task.result()[0].decode("utf-8").strip("\n")
        == data_cols[task_name]["test_command_result"]
    ):
        data_cols[task_name]["status"] = "UP"
    else:
        data_cols[task_name]["status"] = "DOWN"


async def tunnel_test_procs() -> typing.List[asyncio.Task]:
    """run all the tunnel tests in the background as separate tasks"""
    tasks: typing.List[asyncio.Task] = []
    for _, value in data_cols.items():
        if value["test_command"] != "":
            tasks.append(
                asyncio.create_task(
                    run_subshell(value["test_command"]), name=value["name"]
                )
            )
            tasks[-1].add_done_callback(tunnel_test_callback)
            await asyncio.sleep(0)

    return tasks


async def tunnel_procs(commands: typing.List[str]) -> None:
    """run all the tunnels in the background as separate tasks"""
    for command in commands:
        asyncio.create_task(run_subshell(command))
        await asyncio.sleep(0)


data_cols: typing.Dict[str, typing.Dict] = {}


async def main() -> None:
    """entrypoint"""
    argparser = Argparser()
    print(Colors.screen_clear, end="")
    print(Colors.hide_cursor, end="")

    with open(argparser.args.config, "rb") as conf_file:
        data = tomllib.load(conf_file)
        for key, value in data.items():
            data_cols[key] = {
                "name": key,
                "address": value["address"],
                "port": value["port"],
                "command": value["command"],
                "status": "UNKN",
                "test_command": value["test_command"],
                "test_command_result": value["test_command_result"],
                "test_interval": value["test_interval"],
                "test_timeout": value["test_timeout"],
                "stdout": "",
                "stderr": "",
            }

        await tunnel_procs([v["command"] for _, v in data_cols.items()])

        while True:
            await tunnel_test_procs()
            lines = ffs(
                2,
                ["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"]
                if not argparser.args.noheader
                else None,
                False,
                [v["name"] for _, v in data_cols.items()],
                [v["address"] for _, v in data_cols.items()],
                [repr(v["port"]) for _, v in data_cols.items()],
                [v["status"] for _, v in data_cols.items()],
                [v["stdout"] for _, v in data_cols.items()],
                [v["stderr"] for _, v in data_cols.items()],
            )
            for line in lines:
                print(line)

            await asyncio.sleep(argparser.args.delay)
            print(Colors.screen_clear, end="")
            print(Colors.hide_cursor, end="")


if __name__ == "__main__":
    asyncio.run(main())