#!/usr/bin/env python """Magni.""" import argparse import asyncio import concurrent.futures import http.server import os import random import secrets import socketserver import sys import typing import bs4 import cv2 # type:ignore import jinja2 import requests class Argparser: # pylint: disable=too-few-public-methods """Argparser class.""" def __init__(self): self.parser = argparse.ArgumentParser() self.parser.add_argument( "--url", "-u", type=str, help="the url to the page containing the images", default="", ) self.parser.add_argument( "--method", "-m", type=str, help="the method to use. either fsrcnn or espcn", default="espcn", ) self.parser.add_argument( "--port", "-p", type=int, help="the port to serve the images over", default=8086, ) self.args = self.parser.parse_args() def get_manganato_headers(url: str) -> typing.Dict[str, str]: """Sets the ncessary headers.""" headers = { "Accept": "image/avif,image/webp,*/*", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate, br", "DNT": "1", "Connection": "keep-alive", "Sec-Fetch-Dest": "image", "Sec-Fetch-Mode": "no-cors", "Sec-Fetch-Site": "cross-site", "Sec-GPC": "1", "Pragma": "no-cache", "Cache-Control": "no-cache", "TE": "trailers", "Referer": url, "User-Agent": get_user_agent(), } return headers def get_model_path() -> str: """Get the model path.""" model_path: str = "" if "MAGNI_MODEL_PATH" in os.environ and os.environ["MAGNI_MODEL_PATH"]: model_path = os.environ["MAGNI_MODEL_PATH"] else: model_path = "./models" if not os.path.exists(model_path): os.makedirs(model_path) return model_path def get_image_path() -> str: """Get the image path.""" image_path: str = "" if "MAGNI_IMAGE_PATH" in os.environ and os.environ["MAGNI_IMAGE_PATH"]: image_path = os.environ["MAGNI_IMAGE_PATH"] else: image_path = "./images" if not os.path.exists(image_path): os.makedirs(image_path) return image_path # TODO-both models are garbage. should train models specifically # for black and white pics. def espcn_superscaler(img): """ESPCN superscaler.""" superres = cv2.dnn_superres.DnnSuperResImpl_create() path = get_model_path() + "/" + "ESPCN_x3.pb" superres.readModel(path) superres.setModel("espcn", 3) result = superres.upsample(img) return result def fsrcnn_superscaler(img): """FSRCNN superscaler""" superres = cv2.dnn_superres.DnnSuperResImpl_create() path = get_model_path() + "/" + "FSRCNN_x3.pb" superres.readModel(path) superres.setModel("fsrcnn", 3) result = superres.upsample(img) return result # flake8: noqa: E501 def get_user_agent() -> str: """Returns a random user agent.""" # user_agents = [ # "Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0", # "Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0", # "Mozilla/5.0 (X11; Linux x86_64; rv:95.0) Gecko/20100101 Firefox/95.0", # "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36", # ] user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36", ] return user_agents[secrets.randbelow(len(user_agents))] def get_proxies() -> typing.Dict: """Get the proxy env vars.""" http_proxy: typing.Optional[str] = None if "HTTP_PROXY" in os.environ and os.environ["HTTP_PROXY"] != "": http_proxy = os.environ["HTTP_PROXY"] https_proxy: typing.Optional[str] = None if "HTTPS_PROXY" in os.environ and os.environ["HTTPS_PROXY"] != "": https_proxy = os.environ["HTTPS_PROXY"] no_proxy: typing.Optional[str] = None if "NO_PROXY" in os.environ and os.environ["NO_PROXY"] != "": no_proxy = os.environ["NO_PROXY"] return {"http": http_proxy, "https": https_proxy, "no_proxy": no_proxy} def single_get(url: str) -> requests.Response: """A simple get.""" return requests.get( url, allow_redirects=True, timeout=10, proxies=get_proxies(), headers=get_manganato_headers(url), ) def multi_get(urls: list) -> list: """Async get.""" with concurrent.futures.ThreadPoolExecutor(max_workers=16) as pool: response_list = list(pool.map(single_get, urls)) return response_list def single_get_tag(url_tag_pair: list) -> typing.Tuple[requests.Response, str]: """A simple get with a tag.""" return ( requests.get( url_tag_pair[0], allow_redirects=True, timeout=10, proxies=get_proxies(), headers=get_manganato_headers(url_tag_pair[0]), ), url_tag_pair[1], ) def multi_get_tag( urls: typing.List[typing.Tuple[str, str]], ) -> typing.Optional[typing.List[typing.Tuple[requests.Response, str]]]: """Async get with a tag.""" with concurrent.futures.ThreadPoolExecutor(max_workers=16) as pool: response_list = list(pool.map(single_get_tag, urls)) return response_list # flake8: noqa: E501 async def model_downloader() -> typing.Optional[typing.List[typing.Tuple[str, str]]]: """Download the models.""" down_list = [ "https://github.com/fannymonori/TF-ESPCN/raw/master/export/ESPCN_x3.pb", "https://github.com/fannymonori/TF-ESPCN/raw/master/export/ESPCN_x2.pb", "https://github.com/fannymonori/TF-ESPCN/raw/master/export/ESPCN_x4.pb", "https://github.com/Saafke/FSRCNN_Tensorflow/raw/master/models/FSRCNN_x4.pb", "https://github.com/Saafke/FSRCNN_Tensorflow/raw/master/models/FSRCNN_x3.pb", "https://github.com/Saafke/FSRCNN_Tensorflow/raw/master/models/FSRCNN_x2.pb", ] url_list: typing.List[str] = [] for model in down_list: if ( os.path.exists(get_model_path() + "/" + model[model.rfind("/") + 1 :]) is False ): url_list.append(model) url_tag_list: typing.List[typing.Tuple[str, str]] = [] for url in url_list: url_tag_list.append((url, url[url.rfind("/") + 1 :])) response_list: typing.Optional[ typing.List[typing.Tuple[requests.Response, str]] ] = multi_get_tag(url_tag_list) model_path: str = os.getcwd() if "MAGNI_MODEL_PATH" in os.environ and os.environ["MAGNI_MODEL_PATH"] != "": model_path = os.environ["MAGNI_MODEL_PATH"] if response_list is None: return None for response, name in response_list: with open(model_path + "/" + name, mode="b+w") as downed: downed.write(response.content) return url_tag_list async def download_all_images(url: str) -> typing.Optional[typing.List[str]]: """Sniffs images.""" response = requests.get(url, timeout=10, allow_redirects=True) if response.content is None: return None soup = bs4.BeautifulSoup(response.content, "lxml") search_results = soup.findAll("img") image_url_list: typing.List[typing.Tuple[str, str]] = [ (result["src"], result["src"][result["src"].rfind("/") + 1 :]) for result in search_results ] print(image_url_list) response_list: typing.Optional[ typing.List[typing.Tuple[requests.Response, str]] ] = multi_get_tag(image_url_list) print(response_list) if response_list is None: return None image_name_list: typing.List[str] = [] for response, name in response_list: image_name_list.append(name) with open(get_image_path() + "/" + name, "w+b") as image: image.write(response.content) return image_name_list def superres_images(image_list: typing.List[str], method: str) -> None: """Superscales the images.""" for image in image_list: img = cv2.imread(get_image_path() + "/" + image) if method == "espcn": result = espcn_superscaler(img) elif method == "fsrcnn": result = fsrcnn_superscaler(img) cv2.imwrite(get_image_path() + "/" + image, result) async def handle_downloads( argparser: Argparser, ) -> typing.Optional[typing.List[str]]: """Download the models and the images.""" _, image_name_list = await asyncio.gather( model_downloader(), download_all_images(argparser.args.url) ) return image_name_list def fill_jinja_template(image_name_list: typing.List[str]) -> None: """Fills the jinja template.""" environment = jinja2.Environment( autoescape=True, loader=jinja2.FileSystemLoader(os.getcwd()), ) template = environment.get_template(os.path.join("template.jinja2")) temp_head = template.render({"image_list": image_name_list}) with open( get_image_path() + "/" + "index.html", encoding="utf-8", mode="w" ) as out_file: out_file.write(temp_head) class MagniHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): """We want to server our own index.html from an arbitrary location.""" def __init__(self, *args, **kwargs): super().__init__(*args, directory=get_image_path(), **kwargs) # TODO-add graceful shutdown def serve(port: int) -> None: """Startup a simple http file server.""" handler = MagniHTTPRequestHandler print(f"now servering on{repr(port)}") with socketserver.TCPServer(("", port), handler) as httpd: httpd.serve_forever() def main() -> None: """Entry point.""" argparser = Argparser() image_name_list = asyncio.run(handle_downloads(argparser)) if image_name_list is not None: superres_images(image_name_list, argparser.args.method) print("finished superresing images.") else: print("failed to download all images.", file=sys.stderr) sys.exit(1) fill_jinja_template(image_name_list) serve(argparser.args.port) if __name__ == "__main__": main()