aboutsummaryrefslogtreecommitdiffstats
path: root/devourer.py
diff options
context:
space:
mode:
authorterminaldweller <thabogre@gmail.com>2022-05-19 19:54:35 +0000
committerterminaldweller <thabogre@gmail.com>2022-05-19 19:54:35 +0000
commitfd6f77c7ec5aad1d6fe15f42f681820d10f34b93 (patch)
tree22aec3ae1d7ff778a648cf35854bae2a25c5d27a /devourer.py
parentpush before a new branch (diff)
downloaddevourer-fd6f77c7ec5aad1d6fe15f42f681820d10f34b93.tar.gz
devourer-fd6f77c7ec5aad1d6fe15f42f681820d10f34b93.zip
restructing. added a new service to handle the downloading
Diffstat (limited to 'devourer.py')
-rw-r--r--devourer.py445
1 files changed, 0 insertions, 445 deletions
diff --git a/devourer.py b/devourer.py
deleted file mode 100644
index 34185ae..0000000
--- a/devourer.py
+++ /dev/null
@@ -1,445 +0,0 @@
-# _*_ coding=utf-8 _*_
-
-import bs4
-import contextlib
-import datetime
-import fastapi
-import gtts
-import logging
-import newspaper
-import nltk
-import os
-import random
-import re
-import readability
-import requests
-import string
-import tempfile
-import tika
-from tika import parser as tparser
-import transformers
-
-
-# FIXME-maybe actually really do some logging
-def logError(err: requests.exceptions.RequestException) -> None:
- """Logs the errors."""
- logging.exception(err)
-
-
-def isAGoodResponse(resp: requests.Response) -> bool:
- """Checks whether the get we sent got a 200 response."""
- content_type = resp.headers["Content-Type"].lower()
- return resp.status_code == 200 and content_type is not None
-
-
-def simpleGet(url: str) -> bytes:
- """Issues a simple get request."""
- try:
- with contextlib.closing(requests.get(url, stream=True)) as resp:
- if isAGoodResponse(resp):
- return resp.content
- else:
- return None
- except requests.exceptions.RequestException as e:
- logError("Error during requests to {0} : {1}".format(url, str(e)))
- return None
-
-
-def getWithParams(url: str, params: dict) -> dict:
- """Issues a get request with params."""
- try:
- with contextlib.closing(
- requests.get(url, params=params, stream=True)
- ) as resp:
- if isAGoodResponse(resp):
- return resp.json()
- else:
- return None
- except requests.exceptions.RequestException as e:
- logError("Error during requests to {0} : {1}".format(url, str(e)))
- return None
-
-
-def getRandStr(n):
- """Return a random string of the given length."""
- return "".join([random.choice(string.lowercase) for i in range(n)])
-
-
-def getURLS(source: str) -> dict:
- """Extracts the urls from a website."""
- result = dict()
- raw_ml = simpleGet(source)
- ml = bs4.BeautifulSoup(raw_ml, "lxml")
-
- rand_tmp = "/tmp/" + getRandStr(20)
- ml_str = repr(ml)
- tmp = open(rand_tmp, "w")
- tmp.write(ml_str)
- tmp.close()
- tmp = open(rand_tmp, "r")
- url_list = []
- for line in tmp:
- url = re.findall(
- "http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*(),]|"
- r"(?:%[0-9a-fA-F][0-9a-fA-F]))+",
- line,
- )
- url_list += url
- for elem in url_list:
- result[elem] = elem
- tmp.close()
- return result
-
-
-def configNews(config: newspaper.Config) -> None:
- """Configures newspaper."""
- config.fetch_images = False
- config.keep_article_html = True
- config.memoize_articles = False
- config.browser_user_agent = "Chrome/91.0.4464.5"
-
-
-# FIXME-have to decide whether to use files or urls
-def pdfToVoice() -> str:
- """Main function for converting a pdf to an mp3."""
- outfile = str()
- try:
- rawText = tika.parser.from_file()
- tts = gtts.gTTS(rawText["content"])
- outfile = getRandStr(20) + ".mp3"
- tts.save(outfile)
- except Exception as e:
- logging.exception(e)
- finally:
- return outfile
-
-
-def extractRequirements(textBody: str) -> list:
- """Extract the sentences containing the keywords that denote a requirement.
-
- the keywords are baed on ISO/IEC directives, part 2:
- https://www.iso.org/sites/directives/current/part2/index.xhtml
- """
- result = []
- REQ_KEYWORDS = [
- "shall",
- "shall not",
- "should",
- "should not",
- "must",
- "may",
- "can",
- "cannot",
- ]
- sentences = nltk.sent_tokenize(textBody)
- for sentence in sentences:
- for keyword in REQ_KEYWORDS:
- if sentence.casefold().find(keyword) >= 0:
- result.append(sentence)
- return result
-
-
-def pdfToText(url: str) -> str:
- """Convert the PDF file to a string"""
- tikaResult = dict()
- try:
- with tempfile.NamedTemporaryFile(mode="w+b", delete=True) as tmpFile:
- tmpFile.write(simpleGet(url))
- tikaResult = tparser.from_file(
- tmpFile.name, serverEndpoint=os.environ["TIKA_SERVER_ENDPOINT"]
- )
- print(tikaResult["metadata"])
- print(tikaResult["content"])
- except Exception as e:
- logging.exception(e)
- finally:
- if "content" in tikaResult:
- return tikaResult["content"]
- else:
- return ""
-
-
-def summarizeText(text: str) -> str:
- """Summarize the given text using bart."""
-
- model = transformers.BartForConditionalGeneration.from_pretrained(
- "facebook/bart-large-cnn"
- )
- tokenizer = transformers.BartTokenizer.from_pretrained(
- "facebook/bart-large-cnn"
- )
- inputs = tokenizer([text], max_length=1024, return_tensors="pt")
- summary_ids = model.generate(
- inputs["input_ids"], num_beams=4, max_length=5, early_stopping=True
- )
- return [
- tokenizer.decode(
- g, skip_special_tokens=True, clean_up_tokenization_spaces=False
- )
- for g in summary_ids
- ]
-
-
-def summarizeText_v2(text: str) -> str:
- pass
-
-
-def textToAudio(text: str) -> str:
- """Transform the given text into audio."""
- path = str()
- try:
- time_str = datetime.datetime.today().strftime("%b-%d-%Y-%M-%S-%f")
- tts = gtts.gTTS(text)
- tts.save(os.environ["AUDIO_DUMP_DIR"] + "/" + time_str + ".mp3")
- path = os.environ["AUDIO_DUMP_DIR"] + "/" + time_str + ".mp3"
- except Exception as e:
- logging.exception(e)
- finally:
- return path
-
-
-def getRequirements(url: str, sourcetype: str) -> list:
- """Runs the single-link main function."""
- result = str()
- results = list()
- try:
- if sourcetype == "html":
- parser = newspaper.build(url)
- for article in parser.articles:
- a = newspaper.Article(article.url)
- a.download()
- a.parse()
- a.nlp()
- doc = readability.Document(a.html)
- print(doc)
- # print(doc.summary())
- # results = extractRequirements(doc.summary())
- results = extractRequirements(doc)
- elif sourcetype == "text":
- bytesText = simpleGet(url)
- results = extractRequirements(bytesText.decode("utf-8"))
- except Exception as e:
- logging.exception(e)
- finally:
- print(result)
- # result = "".join(results) + "\n"
- # return result
- return results
-
-
-# FIXME-summary=bart doesnt work
-def summarizeLinkToAudio(url, summary) -> str:
- """Summarizes the text inside a given url into audio."""
- result = str()
- try:
- article = newspaper.Article(url)
- article.download()
- article.parse()
- if summary == "newspaper":
- article.nlp()
- result = article.summary
- elif summary == "none":
- result = article.text
- elif summary == "bart":
- result = article.text
- else:
- print("invalid option for summary type.")
- except Exception as e:
- logging.exception(e)
- finally:
- return result
-
-
-# FIXME-change my name
-def summarizeLinksToAudio(url, summary) -> None:
- """Summarize a list of urls into audio files."""
- results = list()
- result = str()
- try:
- config = newspaper.Config()
- configNews(config)
- urls = getURLS(url, summary)
- for url in urls:
- results.append(summarizeLinkToAudio(url))
- except Exception as e:
- logging.exception(e)
- finally:
- result = "".join(results)
- return result
-
-
-def searchWikipedia(search_term: str, summary: str) -> str:
- """Search wikipedia for a string and return the url.
-
- reference: https://www.mediawiki.org/wiki/API:Opensearch
- """
- result = str()
- try:
- searchParmas = {
- "action": "opensearch",
- "namespace": "0",
- "search": search_term,
- "limit": "10",
- "format": "json",
- }
- res = getWithParams(os.environ["WIKI_SEARCH_URL"], searchParmas)
- # FIXME-handle wiki redirects/disambiguations
- source = res[3][0]
- result = summarizeLinkToAudio(source, summary)
- except Exception as e:
- logging.exception(e)
- finally:
- return result
-
-
-def getAudioFromFile(audio_path: str) -> str:
- """Returns the contents of a file in binary format"""
- with open(audio_path, "rb") as audio:
- return audio.read()
-
-
-def getSentiments(detailed: bool) -> list:
- """Get sentiments"""
- results = list()
- SOURCE = "https://github.com/coinpride/CryptoList"
- urls = simpleGet(SOURCE)
- classifier = transformers.pipeline("sentiment-analysis")
- for url in urls:
- req_result = simpleGet(url)
- results.append(classifier(req_result))
- return results
-
-
-app = fastapi.FastAPI()
-
-
-# https://cheatsheetseries.owasp.org/cheatsheets/REST_Security_Cheat_Sheet.html
-@app.middleware("http")
-async def addSecureHeaders(
- request: fastapi.Request, call_next
-) -> fastapi.Response:
- """adds security headers proposed by OWASP"""
- response = await call_next(request)
- response.headers["Cache-Control"] = "no-store"
- response.headers["Content-Security-Policy"] = "default-src-https"
- response.headers["Strict-Transport-Security"] = "max-age=63072000"
- response.headers["X-Content-Type-Options"] = "nosniff"
- response.headers["X-Frame-Options"] = "DENY"
- response.headers["Access-Control-Allow-Methods"] = "GET,OPTIONS"
- return response
-
-
-nltk.download("punkt")
-transformers_summarizer = transformers.pipeline("summarization")
-
-
-@app.get("/mila/pdf")
-def pdf_ep(url: str, feat: str, audio: bool = False, summarize: bool = False):
- text = pdfToText(url)
- if summarize:
- text = summarizeText(text)
- # if audio:
- # audio_path = textToAudio(text)
- # return fastapi.Response(
- # getAudioFromFile(audio_path) if audio_path != "" else "",
- # media_type="audio/mpeg",
- # )
- return {
- "Content-Type": "application/json",
- "isOk": True if text != "" else False,
- "result": text,
- }
-
-
-@app.get("/mila/tika")
-def pdf_to_audio_ep(url: str):
- """turns a pdf into an audiofile"""
- audio_path = pdfToVoice()
- return fastapi.Response(
- getAudioFromFile(audio_path) if audio_path != "" else "",
- media_type="audio/mpeg",
- )
-
-
-@app.get("/mila/reqs")
-def extract_reqs_ep(url: str, sourcetype: str = "html"):
- """extracts the requirements from a given url"""
- result = getRequirements(url, sourcetype)
- return {
- "Content-Type": "application/json",
- "isOK": True if result is not None else False,
- "reqs": result,
- }
-
-
-@app.get("/mila/wiki")
-def wiki_search_ep(term: str, summary: str = "none", audio: bool = False):
- """search and summarizes from wikipedia"""
- text = searchWikipedia(term, summary)
- if audio:
- audio_path = textToAudio(text)
- return fastapi.Response(
- getAudioFromFile(audio_path) if audio_path != "" else "",
- media_type="audio/mpeg",
- )
- else:
- return {
- "Content-Type": "application/json",
- "isOK": True if text != "" else False,
- "audio": "",
- "text": text,
- }
-
-
-@app.get("/mila/summ")
-def summarize_ep(url: str, summary: str = "none", audio: bool = False):
- """summarize and turn the summary into audio"""
- text = summarizeLinkToAudio(url, summary)
- if audio:
- audio_path = textToAudio(text)
- print(audio_path)
- return fastapi.Response(
- getAudioFromFile(audio_path) if audio_path != "" else "",
- media_type="audio/mpeg",
- )
- else:
- return {
- "Content-Type": "application/json",
- "isOK": True if text != "" else False,
- # "audio": "",
- "text": text,
- }
-
-
-@app.get("/mila/mila")
-def mila_ep(url: str, summary: str = "newspaper", audio: bool = False):
- """extract all the urls and then summarize and turn into audio"""
- text = summarizeLinksToAudio(url, summary)
- if audio:
- audio_path = textToAudio(text)
- print(audio_path)
- return fastapi.Response(
- getAudioFromFile(audio_path) if audio_path != "" else "",
- media_type="audio/mpeg",
- )
- else:
- return {
- "Content-Type": "application/json",
- "isOK": True if text != "" else False,
- "audio": "",
- "text": text,
- }
-
-
-@app.get("/mila/health")
-def health_ep():
- return {"Content-Type": "application/json", "isOK": True}
-
-
-@app.get("/mila/robots.txt")
-def robots_ep():
- return {
- "Content-Type": "apllication/json",
- "User-Agents": "*",
- "Disallow": "/",
- }