diff options
| author | terminaldweller <devi@terminaldweller.com> | 2023-09-02 18:59:30 +0000 | 
|---|---|---|
| committer | terminaldweller <devi@terminaldweller.com> | 2023-09-02 18:59:30 +0000 | 
| commit | 01a7f64f93531fa0c39c784ffa8313f0940f896c (patch) | |
| tree | de1fedc65e3f1cc42f9d0eead96a3804a8747c2b | |
| parent | added docker (diff) | |
| download | magni-01a7f64f93531fa0c39c784ffa8313f0940f896c.tar.gz magni-01a7f64f93531fa0c39c784ffa8313f0940f896c.zip | |
Diffstat (limited to '')
| -rwxr-xr-x | magni.py | 76 | 
1 files changed, 43 insertions, 33 deletions
| @@ -5,6 +5,7 @@ import argparse  import asyncio  import concurrent.futures  import http.server +import json  import os  import random  import secrets @@ -44,27 +45,39 @@ class Argparser:  # pylint: disable=too-few-public-methods              help="the port to serve the images over",              default=8086,          ) +        self.parser.add_argument( +            "--headers", +            type=str, +            help="a file containing the headers for the GET requests", +            default="", +        )          self.args = self.parser.parse_args() -def get_manganato_headers(url: str) -> typing.Dict[str, str]: +def get_headers(url: str) -> typing.Dict[str, str]:      """Sets the ncessary headers.""" -    headers = { -        "Accept": "image/avif,image/webp,*/*", -        "Accept-Language": "en-US,en;q=0.5", -        "Accept-Encoding": "gzip, deflate, br", -        "DNT": "1", -        "Connection": "keep-alive", -        "Sec-Fetch-Dest": "image", -        "Sec-Fetch-Mode": "no-cors", -        "Sec-Fetch-Site": "cross-site", -        "Sec-GPC": "1", -        "Pragma": "no-cache", -        "Cache-Control": "no-cache", -        "TE": "trailers", -        "Referer": url, -        "User-Agent": get_user_agent(), -    } +    headers: typing.Dict = {} + +    if argparser.args.headers != "": +        with open(argparser.args.headers, "r") as headers_file: +            headers = json.load(headers_file) +    else: +        headers = { +            "Accept": "image/avif,image/webp,*/*", +            "Accept-Language": "en-US,en;q=0.5", +            "Accept-Encoding": "gzip, deflate, br", +            "DNT": "1", +            "Connection": "keep-alive", +            "Sec-Fetch-Dest": "image", +            "Sec-Fetch-Mode": "no-cors", +            "Sec-Fetch-Site": "cross-site", +            "Sec-GPC": "1", +            "Pragma": "no-cache", +            "Cache-Control": "no-cache", +            "TE": "trailers", +            "Referer": url, +            "User-Agent": get_user_agent(), +        }      return headers @@ -153,7 +166,7 @@ def single_get(url: str) -> requests.Response:          allow_redirects=True,          timeout=10,          proxies=get_proxies(), -        headers=get_manganato_headers(url), +        headers=get_headers(url),      ) @@ -172,7 +185,7 @@ def single_get_tag(url_tag_pair: list) -> typing.Tuple[requests.Response, str]:              allow_redirects=True,              timeout=10,              proxies=get_proxies(), -            headers=get_manganato_headers(url_tag_pair[0]), +            headers=get_headers(url_tag_pair[0]),          ),          url_tag_pair[1],      ) @@ -227,9 +240,9 @@ async def model_downloader() -> typing.Optional[typing.List[typing.Tuple[str, st      return url_tag_list -async def download_all_images(url: str) -> typing.Optional[typing.List[str]]: +async def download_all_images() -> typing.Optional[typing.List[str]]:      """Sniffs images.""" -    response = requests.get(url, timeout=10, allow_redirects=True) +    response = requests.get(argparser.args.url, timeout=10, allow_redirects=True)      if response.content is None:          return None @@ -259,24 +272,20 @@ async def download_all_images(url: str) -> typing.Optional[typing.List[str]]:      return image_name_list -def superres_images(image_list: typing.List[str], method: str) -> None: +def superres_images(image_list: typing.List[str]) -> None:      """Superscales the images."""      for image in image_list:          img = cv2.imread(get_image_path() + "/" + image) -        if method == "espcn": +        if argparser.args.method == "espcn":              result = espcn_superscaler(img) -        elif method == "fsrcnn": +        elif argparser.args.method == "fsrcnn":              result = fsrcnn_superscaler(img)          cv2.imwrite(get_image_path() + "/" + image, result) -async def handle_downloads( -    argparser: Argparser, -) -> typing.Optional[typing.List[str]]: +async def handle_downloads() -> typing.Optional[typing.List[str]]:      """Download the models and the images.""" -    _, image_name_list = await asyncio.gather( -        model_downloader(), download_all_images(argparser.args.url) -    ) +    _, image_name_list = await asyncio.gather(model_downloader(), download_all_images())      return image_name_list @@ -314,11 +323,10 @@ def serve(port: int) -> None:  def main() -> None:      """Entry point.""" -    argparser = Argparser() -    image_name_list = asyncio.run(handle_downloads(argparser)) +    image_name_list = asyncio.run(handle_downloads())      if image_name_list is not None: -        superres_images(image_name_list, argparser.args.method) +        superres_images(image_name_list)          print("finished superresing images.")      else:          print("failed to download all images.", file=sys.stderr) @@ -327,5 +335,7 @@ def main() -> None:      serve(argparser.args.port) +argparser = Argparser() +  if __name__ == "__main__":      main() | 
