aboutsummaryrefslogtreecommitdiffstats
path: root/magni.py
diff options
context:
space:
mode:
authorterminaldweller <thabogre@gmail.com>2022-11-23 10:29:47 +0000
committerterminaldweller <thabogre@gmail.com>2022-11-23 10:29:47 +0000
commitce34198e8dd8ce259695cdb510bb11f10a62376b (patch)
tree918b36b6dadfb370677150c86bb0174843a4fa9d /magni.py
parentupdate (diff)
downloadmagni-ce34198e8dd8ce259695cdb510bb11f10a62376b.tar.gz
magni-ce34198e8dd8ce259695cdb510bb11f10a62376b.zip
WIP
Diffstat (limited to 'magni.py')
-rwxr-xr-xmagni.py207
1 files changed, 207 insertions, 0 deletions
diff --git a/magni.py b/magni.py
new file mode 100755
index 0000000..3553869
--- /dev/null
+++ b/magni.py
@@ -0,0 +1,207 @@
+#!/usr/bin/env python
+"""Magni."""
+
+import argparse
+import concurrent.futures
+import http.server
+import os
+import socketserver
+import typing
+
+import bs4
+import cv2 # type:ignore
+import requests
+
+
+class Argparser: # pylint: disable=too-few-public-methods
+ """Argparser class."""
+
+ def __init__(self):
+ self.parser = argparse.ArgumentParser()
+ self.parser.add_argument(
+ "--url",
+ "-u",
+ type=str,
+ help="the url to the page containing the images",
+ default=False,
+ )
+ self.parser.add_argument(
+ "--port",
+ "-p",
+ type=int,
+ help="the port to serve the images over",
+ default=6666,
+ )
+ self.args = self.parser.parse_args()
+
+
+def get_model_path() -> str:
+ """Get the model path."""
+ # FIXME- add path if it doesnt exist
+ if "MAGNI_MODEL_PATH" in os.environ and os.environ["MAGNI_MODEL_PATH"]:
+ return os.environ["MAGNI_MODEL_PATH"]
+ return "./models"
+
+
+def get_image_path() -> str:
+ """Get the image path."""
+ # FIXME- add path if it doesnt exist
+ if "MAGNI_IMAGE_PATH" in os.environ and os.environ["MAGNI_IMAGE_PATH"]:
+ return os.environ["MAGNI_IMAGE_PATH"]
+ return "./images"
+
+
+def espcn_superscaler(img):
+ """ESPCN superscaler."""
+ superres = cv2.dnn_superres.DnnSuperResImpl_create()
+ path = get_model_path() + "/" + "ESPCN_x3.pb"
+ superres.readModel(path)
+ superres.setModel("espcn", 3)
+ result = superres.upsample(img)
+ return result
+
+
+def fsrcnn_superscaler(img):
+ """FSRCNN superscaler"""
+ superres = cv2.dnn_superres.DnnSuperResImpl_create()
+ path = get_model_path() + "/" + "FSRCNN_x3.pb"
+ superres.readModel(path)
+ superres.setModel("fsrcnn", 3)
+ result = superres.upsample(img)
+ return result
+
+
+def get_proxies() -> typing.Dict:
+ """Get the proxy env vars."""
+ http_proxy: typing.Optional[str] = None
+ if "HTTP_PROXY" in os.environ and os.environ["HTTP_PROXY"] != "":
+ http_proxy = os.environ["HTTP_PROXY"]
+
+ https_proxy: typing.Optional[str] = None
+ if "HTTPS_PROXY" in os.environ and os.environ["HTTPS_PROXY"] != "":
+ https_proxy = os.environ["HTTPS_PROXY"]
+
+ no_proxy: typing.Optional[str] = None
+ if "NO_PROXY" in os.environ and os.environ["NO_PROXY"] != "":
+ no_proxy = os.environ["NO_PROXY"]
+
+ return {"http": http_proxy, "https": https_proxy, "no_proxy": no_proxy}
+
+
+def single_get(url: str) -> requests.Response:
+ """A simple get."""
+ return requests.get(
+ url, allow_redirects=True, timeout=10, proxies=get_proxies()
+ )
+
+
+def multi_get(urls: list) -> list:
+ """Async get."""
+ with concurrent.futures.ThreadPoolExecutor(max_workers=16) as pool:
+ response_list = list(pool.map(single_get, urls))
+ return response_list
+
+
+def single_get_tag(url_tag_pair: list) -> typing.Tuple[requests.Response, str]:
+ """A simple get with a tag."""
+ return (
+ requests.get(
+ url_tag_pair[0],
+ allow_redirects=True,
+ timeout=10,
+ proxies=get_proxies(),
+ ),
+ url_tag_pair[1],
+ )
+
+
+def multi_get_tag(
+ urls: typing.List[typing.Tuple[str, str]],
+) -> typing.Optional[typing.List[typing.Tuple[requests.Response, str]]]:
+ """Async get with a tag."""
+ with concurrent.futures.ThreadPoolExecutor(max_workers=16) as pool:
+ response_list = list(pool.map(single_get_tag, urls))
+ return response_list
+
+
+def model_downloader() -> None:
+ """Download the models."""
+ down_list = [
+ "https://github.com/fannymonori/TF-ESPCN/raw/master/export/ESPCN_x3.pb",
+ "https://github.com/fannymonori/TF-ESPCN/raw/master/export/ESPCN_x2.pb",
+ "https://github.com/fannymonori/TF-ESPCN/raw/master/export/ESPCN_x4.pb",
+ "https://github.com/Saafke/FSRCNN_Tensorflow/raw/master/models/FSRCNN_x4.pb",
+ "https://github.com/Saafke/FSRCNN_Tensorflow/raw/master/models/FSRCNN_x3.pb",
+ "https://github.com/Saafke/FSRCNN_Tensorflow/raw/master/models/FSRCNN_x2.pb",
+ ]
+
+ url_tag_list: typing.List[typing.Tuple[str, str]] = []
+ for url in down_list:
+ url_tag_list.append((url, url[url.rfind("/") + 1 :]))
+ response_list: typing.Optional[
+ typing.List[typing.Tuple[requests.Response, str]]
+ ] = multi_get_tag(url_tag_list)
+
+ model_path: str = os.getcwd()
+ if (
+ "MAGNI_MODEL_PATH" in os.environ
+ and os.environ["MAGNI_MODEL_PATH"] != ""
+ ):
+ model_path = os.environ["MAGNI_MODEL_PATH"]
+
+ if response_list is None:
+ return None
+ for response, name in response_list:
+ with open(model_path + "/" + name, mode="b+w") as downed:
+ downed.write(response.content)
+
+ return None
+
+
+def download_all_images(url: str) -> None:
+ """Sniffs images."""
+ response = requests.get(url, timeout=10, allow_redirects=True)
+ if response.content is None:
+ return None
+
+ soup = bs4.BeautifulSoup(response.content, "lxml")
+ search_results = soup.findAll("img")
+
+ image_url_list: typing.List[typing.Tuple[str, str]] = [
+ (result["src"], result["src"][result["src"].rfind("/") + 1 :])
+ for result in search_results
+ ]
+
+ print(image_url_list)
+ response_list: typing.Optional[
+ typing.List[typing.Tuple[requests.Response, str]]
+ ] = multi_get_tag(image_url_list)
+ print(response_list)
+
+ if response_list is None:
+ return None
+
+ for response, name in response_list:
+ with open(get_image_path() + "/" + name, "w+b") as image:
+ image.write(response.content)
+
+ return None
+
+
+def serve(port_number: int) -> None:
+ """Startup a simple http file server."""
+ handler = http.server.SimpleHTTPRequestHandler
+
+ with socketserver.TCPServer(("", port_number), handler) as httpd:
+ httpd.serve_forever()
+
+
+def main() -> None:
+ """Entry point."""
+ argparser = Argparser()
+ model_downloader()
+ download_all_images(argparser.args.url)
+
+
+if __name__ == "__main__":
+ main()