diff options
Diffstat (limited to 'bin')
| -rwxr-xr-x | bin/tunneltop | 230 | ||||
| -rwxr-xr-x | bin/virttop | 2 | 
2 files changed, 231 insertions, 1 deletions
| diff --git a/bin/tunneltop b/bin/tunneltop new file mode 100755 index 0000000..ebed5d9 --- /dev/null +++ b/bin/tunneltop @@ -0,0 +1,230 @@ +#!/usr/bin/env python +"""A top-like program for monitoring ssh tunnels""" + +import argparse +import asyncio +import enum +import sys +import typing + +import tomllib + + +class Argparser:  # pylint: disable=too-few-public-methods +    """Argparser class.""" + +    def __init__(self): +        self.parser = argparse.ArgumentParser() +        self.parser.add_argument( +            "--config", +            "-c", +            type=str, +            help="the path to the .tunneltop.toml file", +            default="/home/devi/.tunneltop.toml", +        ) +        self.parser.add_argument( +            "--noheader", +            "-n", +            type=bool, +            help="dont print the header", +            default=False, +        ) +        self.parser.add_argument( +            "--delay", +            "-d", +            type=float, +            help="The delay between updates in seconds", +            default=5, +        ) +        self.args = self.parser.parse_args() + + +# pylint: disable=too-few-public-methods +class Colors(enum.EnumType): +    """static color definitions""" + +    purple = "\033[95m" +    blue = "\033[94m" +    green = "\033[92m" +    yellow = "\033[93m" +    red = "\033[91m" +    grey = "\033[1;37m" +    darkgrey = "\033[1;30m" +    cyan = "\033[1;36m" +    ENDC = "\033[0m" +    BOLD = "\033[1m" +    UNDERLINE = "\033[4m" +    blueblue = "\x1b[38;5;24m" +    greenie = "\x1b[38;5;23m" +    goo = "\x1b[38;5;22m" +    screen_clear = "\033c\033[3J" +    hide_cursor = "\033[?25l" + + +# pylint: disable=too-many-locals +def ffs( +    offset: int, +    header_list: typing.Optional[typing.List[str]], +    numbered: bool, +    *args, +) -> typing.List[str]: +    """A simple columnar printer""" +    max_column_width = [] +    lines = [] +    numbers_f: typing.List[int] = [] +    dummy = [] + +    if sys.stdout.isatty(): +        greenie = Colors.greenie +        bold = Colors.BOLD +        endc = Colors.ENDC +        goo = Colors.goo +        blueblue = Colors.blueblue +    else: +        greenie = "" +        bold = "" +        endc = "" +        goo = "" +        blueblue = "" + +    for arg in args: +        max_column_width.append(max(len(repr(argette)) for argette in arg)) + +    if header_list is not None: +        if numbered: +            numbers_f.extend(range(1, len(args[-1]) + 1)) +            max_column_width.append( +                max(len(repr(number)) for number in numbers_f) +            ) +            header_list.insert(0, "idx") + +        index = range(0, len(header_list)) +        for header, width, i in zip(header_list, max_column_width, index): +            max_column_width[i] = max(len(header), width) + offset + +        for i in index: +            dummy.append( +                greenie +                + bold +                + header_list[i].ljust(max_column_width[i]) +                + endc +            ) +        lines.append("".join(dummy)) +        dummy.clear() + +    index2 = range(0, len(args[-1])) +    for i in index2: +        if numbered: +            dummy.append( +                goo + bold + repr(i).ljust(max_column_width[0]) + endc +            ) +            for arg, width in zip(args, max_column_width[1:]): +                dummy.append(blueblue + (arg[i]).ljust(width) + endc) +        else: +            for arg, width in zip(args, max_column_width): +                dummy.append(blueblue + (arg[i]).ljust(width) + endc) +        lines.append("".join(dummy)) +        dummy.clear() +    return lines + + +async def run_subshell(cmd: str) -> typing.Tuple[bytes, bytes]: +    """Run a command in a subshell""" +    proc = await asyncio.create_subprocess_shell( +        cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE +    ) + +    # return stdout and stderr +    return await proc.communicate() + + +def tunnel_test_callback(task: asyncio.Task) -> None: +    """Tunnel test callback function.""" +    task_name = task.get_name() +    # data_cols["stdout"] = task.result()[0] +    # data_cols["stderr"] = task.result()[1] +    if ( +        task.result()[0].decode("utf-8").strip("\n") +        == data_cols[task_name]["test_command_result"] +    ): +        data_cols[task_name]["status"] = "UP" +    else: +        data_cols[task_name]["status"] = "DOWN" + + +async def tunnel_test_procs() -> typing.List[asyncio.Task]: +    """run all the tunnel tests in the background as separate tasks""" +    tasks: typing.List[asyncio.Task] = [] +    for _, value in data_cols.items(): +        if value["test_command"] != "": +            tasks.append( +                asyncio.create_task( +                    run_subshell(value["test_command"]), name=value["name"] +                ) +            ) +            tasks[-1].add_done_callback(tunnel_test_callback) +            await asyncio.sleep(0) + +    return tasks + + +async def tunnel_procs(commands: typing.List[str]) -> None: +    """run all the tunnels in the background as separate tasks""" +    for command in commands: +        asyncio.create_task(run_subshell(command)) +        await asyncio.sleep(0) + + +data_cols: typing.Dict[str, typing.Dict] = {} + + +async def main() -> None: +    """entrypoint""" +    argparser = Argparser() +    print(Colors.screen_clear, end="") +    print(Colors.hide_cursor, end="") + +    with open(argparser.args.config, "rb") as conf_file: +        data = tomllib.load(conf_file) +        for key, value in data.items(): +            data_cols[key] = { +                "name": key, +                "address": value["address"], +                "port": value["port"], +                "command": value["command"], +                "status": "UNKN", +                "test_command": value["test_command"], +                "test_command_result": value["test_command_result"], +                "test_interval": value["test_interval"], +                "test_timeout": value["test_timeout"], +                "stdout": "", +                "stderr": "", +            } + +        await tunnel_procs([v["command"] for _, v in data_cols.items()]) + +        while True: +            await tunnel_test_procs() +            lines = ffs( +                2, +                ["NAME", "ADDRESS", "PORT", "STATUS", "STDOUT", "STDERR"] +                if not argparser.args.noheader +                else None, +                False, +                [v["name"] for _, v in data_cols.items()], +                [v["address"] for _, v in data_cols.items()], +                [repr(v["port"]) for _, v in data_cols.items()], +                [v["status"] for _, v in data_cols.items()], +                [v["stdout"] for _, v in data_cols.items()], +                [v["stderr"] for _, v in data_cols.items()], +            ) +            for line in lines: +                print(line) + +            await asyncio.sleep(argparser.args.delay) +            print(Colors.screen_clear, end="") +            print(Colors.hide_cursor, end="") + + +if __name__ == "__main__": +    asyncio.run(main()) diff --git a/bin/virttop b/bin/virttop index 61dfd04..24c4bb8 100755 --- a/bin/virttop +++ b/bin/virttop @@ -152,7 +152,7 @@ def ffs(      header_list: typing.Optional[typing.List[str]],      numbered: bool,      *args, -): +) -> typing.List[str]:      """A simple columnar printer"""      max_column_width = []      lines = [] | 
