diff options
-rw-r--r-- | devourer/.pylintrc | 2 | ||||
-rw-r--r-- | devourer/devourer.py | 154 |
2 files changed, 76 insertions, 80 deletions
diff --git a/devourer/.pylintrc b/devourer/.pylintrc new file mode 100644 index 0000000..852fbeb --- /dev/null +++ b/devourer/.pylintrc @@ -0,0 +1,2 @@ +[BASIC] +good-names=e,i,j,k diff --git a/devourer/devourer.py b/devourer/devourer.py index 7ee2fb1..fbe3888 100644 --- a/devourer/devourer.py +++ b/devourer/devourer.py @@ -1,4 +1,5 @@ # _*_ coding=utf-8 _*_ +"""Personal knowledge aggregator.""" import contextlib import datetime @@ -44,7 +45,7 @@ def simple_get(url: str) -> bytes: if is_a_good_response(resp): content = resp.content except requests.exceptions.RequestException as e: - log_error("Error during requests to {0} : {1}".format(url, str(e))) + log_error(f"Error during requests to {0} : {1}".format(url, str(e))) finally: return content @@ -57,30 +58,29 @@ def get_with_params(url: str, params: dict) -> typing.Optional[dict]: ) as resp: if is_a_good_response(resp): return resp.json() - else: - return None + return None except requests.exceptions.RequestException as e: - log_error("Error during requests to {0} : {1}".format(url, str(e))) + log_error(f"Error during requests to {0} : {1}".format(url, str(e))) return None -def getRandStr(n): +def get_rand_str(count): """Return a random string of the given length.""" - return "".join([random.choice(string.lowercase) for i in range(n)]) + return "".join([random.choice(string.lowercase) for i in range(count)]) -def getURLS(source: str, summary: str) -> dict: +def get_urls(source: str, summary: str) -> dict: """Extracts the urls from a website.""" result = {} raw_ml = simple_get(source) ml = bs4.BeautifulSoup(raw_ml, "lxml") - rand_tmp = "/tmp/" + getRandStr(20) + rand_tmp = "/tmp/" + get_rand_str(20) ml_str = repr(ml) - tmp = open(rand_tmp, "w") + tmp = open(rand_tmp, "w", encoding="utf-8") tmp.write(ml_str) tmp.close() - tmp = open(rand_tmp, "r") + tmp = open(rand_tmp, "r", encoding="utf-8") url_list = [] for line in tmp: url = re.findall( @@ -116,9 +116,9 @@ def pdf_to_voice() -> str: """Main function for converting a pdf to an mp3.""" outfile = str() try: - rawText = tika.parser.from_file() - tts = gtts.gTTS(rawText["content"]) - outfile = getRandStr(20) + ".mp3" + raw_text = tika.parser.from_file() + tts = gtts.gTTS(raw_text["content"]) + outfile = get_rand_str(20) + ".mp3" tts.save(outfile) except Exception as e: logging.exception(e) @@ -126,13 +126,13 @@ def pdf_to_voice() -> str: return outfile -def extractRequirements(textBody: str) -> list: +def extract_requirements(text_body: str) -> list: """Extract the sentences containing the keywords that denote a requirement. the keywords are baed on ISO/IEC directives, part 2: https://www.iso.org/sites/directives/current/part2/index.xhtml """ result = [] - REQ_KEYWORDS = [ + req_keywords = [ "shall", "shall not", "should", @@ -142,9 +142,9 @@ def extractRequirements(textBody: str) -> list: "can", "cannot", ] - sentences = nltk.sent_tokenize(textBody) + sentences = nltk.sent_tokenize(text_body) for sentence in sentences: - for keyword in REQ_KEYWORDS: + for keyword in req_keywords: if sentence.casefold().find(keyword) >= 0: result.append(sanitize_text(sentence)) return result @@ -164,32 +164,29 @@ def extract_refs(url: str) -> list: def pdf_to_text(url: str) -> str: """Convert the PDF file to a string.""" - tikaResult = {} + tika_result = {} try: - with tempfile.NamedTemporaryFile(mode="w+b", delete=True) as tmpFile: + with tempfile.NamedTemporaryFile(mode="w+b", delete=True) as tmp_file: content = simple_get(url) if content is not None: - tmpFile.write(content) - tikaResult = tparser.from_file( - tmpFile.name, + tmp_file.write(content) + tika_result = tparser.from_file( + tmp_file.name, serverEndpoint=os.environ["TIKA_SERVER_ENDPOINT"], ) - # print(tikaResult["metadata"]) - # print(tikaResult["content"]) except Exception as e: logging.exception(e) finally: - if "content" in tikaResult: - return sanitize_text(tikaResult["content"]) - else: - return "" + if "content" in tika_result: + return sanitize_text(tika_result["content"]) + return "" # TODO-very performance-intensive def summarize_text(text: str) -> str: """Summarize the given text using bart.""" result = str() - # TODO move me later + # TODO-move me later transformers_summarizer = transformers.pipeline("summarization") try: sentences = text.split(".") @@ -211,8 +208,8 @@ def summarize_text(text: str) -> str: chunks.append(sentence.split(" ")) print(chunks) - for chunk_id in range(len(chunks)): - chunks[chunk_id] = "".join(chunks[chunk_id]) + for i, chunk in enumerate(chunks): + chunks[i] = "".join(chunk) print(chunks) summaries = transformers_summarizer( @@ -282,7 +279,7 @@ def text_to_audio(text: str) -> str: return path -def getRequirements(url: str, sourcetype: str) -> list: +def get_requirements(url: str, sourcetype: str) -> list: """Runs the single-link main function.""" result = str() results = [] @@ -290,18 +287,18 @@ def getRequirements(url: str, sourcetype: str) -> list: if sourcetype == "html": parser = newspaper.build(url) for article in parser.articles: - a = newspaper.Article(article.url) - a.download() - a.parse() - a.nlp() - doc = readability.Document(a.html) + art = newspaper.Article(article.url) + art.download() + art.parse() + art.nlp() + doc = readability.Document(art.html) print(doc) # print(doc.summary()) # results = extractRequirements(doc.summary()) - results = extractRequirements(doc) + results = extract_requirements(doc) elif sourcetype == "text": - bytesText = simple_get(url) - results = extractRequirements(bytesText.decode("utf-8")) + bytes_text = simple_get(url) + results = extract_requirements(bytes_text.decode("utf-8")) except Exception as e: logging.exception(e) finally: @@ -312,7 +309,7 @@ def getRequirements(url: str, sourcetype: str) -> list: # FIXME-summary=bart doesnt work -def summarizeLinkToAudio(url: str, summary: str) -> str: +def summarize_link_to_audio(url: str, summary: str) -> str: """Summarizes the text inside a given url into audio.""" result = str() try: @@ -337,16 +334,16 @@ def summarizeLinkToAudio(url: str, summary: str) -> str: # FIXME-change my name -def summarizeLinksToAudio(url: str, summary: str) -> str: +def summarize_links_to_audio(origin: str, summary: str) -> str: """Summarize a list of urls into audio files.""" results = [] result = str() try: config = newspaper.Config() config_news(config) - urls = getURLS(url, summary) + urls = get_urls(origin, summary) for url in urls: - results.append(summarizeLinkToAudio(url, summary)) + results.append(summarize_link_to_audio(url, summary)) except Exception as e: logging.exception(e) finally: @@ -354,24 +351,24 @@ def summarizeLinksToAudio(url: str, summary: str) -> str: return result -def searchWikipedia(search_term: str, summary: str) -> str: +def search_wikipedia(search_term: str, summary: str) -> str: """Search wikipedia for a string and return the url. reference: https://www.mediawiki.org/wiki/API:Opensearch. """ result = str() try: - searchParmas = { + search_params = { "action": "opensearch", "namespace": "0", "search": search_term, "limit": "10", "format": "json", } - res = get_with_params(os.environ["WIKI_SEARCH_URL"], searchParmas) + res = get_with_params(os.environ["WIKI_SEARCH_URL"], search_params) # FIXME-handle wiki redirects/disambiguations if res is not None: source = res[3][0] - result = summarizeLinkToAudio(source, summary) + result = summarize_link_to_audio(source, summary) result = sanitize_text(result) except Exception as e: logging.exception(e) @@ -385,17 +382,17 @@ def get_audio_from_file(audio_path: str) -> bytes: return audio.read() -""" -def getSentiments(detailed: bool) -> list: - results = list() - SOURCE = "https://github.com/coinpride/CryptoList" - urls = simpleGet(SOURCE) +# TODO- implement me +def get_sentiments(detailed: bool) -> list: + """Sentiments analysis.""" + results = [] + source = "https://github.com/coinpride/CryptoList" + urls = simple_get(source) classifier = transformers.pipeline("sentiment-analysis") for url in urls: - req_result = simpleGet(url) + req_result = simple_get(url) results.append(classifier(req_result)) return results -""" def get_keywords_from_text(text: str) -> typing.List[str]: @@ -485,7 +482,7 @@ def pdf_to_audio_ep(url: str): @app.get("/mila/reqs") def extract_reqs_ep(url: str, sourcetype: str = "html"): """Extracts the requirements from a given url.""" - result = getRequirements(url, sourcetype) + result = get_requirements(url, sourcetype) return { "Content-Type": "application/json", "isOK": bool(result), @@ -496,26 +493,25 @@ def extract_reqs_ep(url: str, sourcetype: str = "html"): @app.get("/mila/wiki") def wiki_search_ep(term: str, summary: str = "none", audio: bool = False): """Search and summarizes from wikipedia.""" - text = searchWikipedia(term, summary) + text = search_wikipedia(term, summary) if audio: audio_path = text_to_audio(text) return fastapi.Response( get_audio_from_file(audio_path) if audio_path != "" else "", media_type="audio/mpeg", ) - else: - return { - "Content-Type": "application/json", - "isOK": bool(text), - "audio": "", - "text": text, - } + return { + "Content-Type": "application/json", + "isOK": bool(text), + "audio": "", + "text": text, + } @app.get("/mila/summ") def summarize_ep(url: str, summary: str = "none", audio: bool = False): """Summarize and turn the summary into audio.""" - text = summarizeLinkToAudio(url, summary) + text = summarize_link_to_audio(url, summary) if audio: audio_path = text_to_audio(text) print(audio_path) @@ -523,19 +519,18 @@ def summarize_ep(url: str, summary: str = "none", audio: bool = False): get_audio_from_file(audio_path) if audio_path != "" else "", media_type="audio/mpeg", ) - else: - return { - "Content-Type": "application/json", - "isOK": bool(text), - # "audio": "", - "text": text, - } + return { + "Content-Type": "application/json", + "isOK": bool(text), + # "audio": "", + "text": text, + } @app.get("/mila/mila") def mila_ep(url: str, summary: str = "newspaper", audio: bool = False): """Extract all the urls and then summarize and turn into audio.""" - text = summarizeLinksToAudio(url, summary) + text = summarize_links_to_audio(url, summary) if audio: audio_path = text_to_audio(text) print(audio_path) @@ -543,13 +538,12 @@ def mila_ep(url: str, summary: str = "newspaper", audio: bool = False): get_audio_from_file(audio_path) if audio_path != "" else "", media_type="audio/mpeg", ) - else: - return { - "Content-Type": "application/json", - "isOK": bool(text), - "audio": "", - "text": text, - } + return { + "Content-Type": "application/json", + "isOK": bool(text), + "audio": "", + "text": text, + } @app.get("/mila/health") |