diff options
Diffstat (limited to 'magni.py')
-rwxr-xr-x | magni.py | 76 |
1 files changed, 64 insertions, 12 deletions
@@ -2,14 +2,17 @@ """Magni.""" import argparse +import asyncio import concurrent.futures import http.server import os +import random import socketserver import typing import bs4 import cv2 # type:ignore +import playwright import requests @@ -37,18 +40,28 @@ class Argparser: # pylint: disable=too-few-public-methods def get_model_path() -> str: """Get the model path.""" - # FIXME- add path if it doesnt exist + model_path: str = "" if "MAGNI_MODEL_PATH" in os.environ and os.environ["MAGNI_MODEL_PATH"]: - return os.environ["MAGNI_MODEL_PATH"] - return "./models" + model_path = os.environ["MAGNI_MODEL_PATH"] + else: + model_path = "./models" + if not os.path.exists(model_path): + os.makedirs(model_path) + return model_path def get_image_path() -> str: """Get the image path.""" - # FIXME- add path if it doesnt exist + image_path: str = "" if "MAGNI_IMAGE_PATH" in os.environ and os.environ["MAGNI_IMAGE_PATH"]: - return os.environ["MAGNI_IMAGE_PATH"] - return "./images" + image_path = os.environ["MAGNI_IMAGE_PATH"] + else: + image_path = "./images" + + if not os.path.exists(image_path): + os.makedirs(image_path) + + return image_path def espcn_superscaler(img): @@ -71,6 +84,16 @@ def fsrcnn_superscaler(img): return result +def get_user_agent() -> str: + user_agents = [ + "Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0", + "Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0", + "Mozilla/5.0 (X11; Linux x86_64; rv:95.0) Gecko/20100101 Firefox/95.0", + ] + + return user_agents[random.randint(0, len(user_agents) - 1)] + + def get_proxies() -> typing.Dict: """Get the proxy env vars.""" http_proxy: typing.Optional[str] = None @@ -91,7 +114,11 @@ def get_proxies() -> typing.Dict: def single_get(url: str) -> requests.Response: """A simple get.""" return requests.get( - url, allow_redirects=True, timeout=10, proxies=get_proxies() + url, + allow_redirects=True, + timeout=10, + proxies=get_proxies(), + headers={"User-Agent": get_user_agent()}, ) @@ -110,6 +137,7 @@ def single_get_tag(url_tag_pair: list) -> typing.Tuple[requests.Response, str]: allow_redirects=True, timeout=10, proxies=get_proxies(), + headers={"User-Agent": get_user_agent()}, ), url_tag_pair[1], ) @@ -124,7 +152,9 @@ def multi_get_tag( return response_list -def model_downloader() -> None: +async def model_downloader() -> typing.Optional[ + typing.List[typing.Tuple[str, str]] +]: """Download the models.""" down_list = [ "https://github.com/fannymonori/TF-ESPCN/raw/master/export/ESPCN_x3.pb", @@ -155,10 +185,10 @@ def model_downloader() -> None: with open(model_path + "/" + name, mode="b+w") as downed: downed.write(response.content) - return None + return url_tag_list -def download_all_images(url: str) -> None: +async def download_all_images(url: str) -> None: """Sniffs images.""" response = requests.get(url, timeout=10, allow_redirects=True) if response.content is None: @@ -196,11 +226,33 @@ def serve(port_number: int) -> None: httpd.serve_forever() +async def get_images(url) -> None: + """Get the images with a headless browser because CORS.""" + async with playwright.async_api.async_playwright as async_p: + for browser_type in [ + async_p.chromium, + async_p.firefox, + async_p.webkit, + ]: + browser = await browser_type.launch() + page = await browser.new_page() + await page.goto(url) + image_list: typing.List = [] + all_links = page.query_selector_all("img") + await browser.close() + + +async def handle_downloads(argparser: Argparser) -> None: + """Download the models and the images.""" + await asyncio.gather( + model_downloader(), download_all_images(argparser.args.url) + ) + + def main() -> None: """Entry point.""" argparser = Argparser() - model_downloader() - download_all_images(argparser.args.url) + asyncio.run(handle_downloads(argparser)) if __name__ == "__main__": |