diff --git a/PFERD/crawlers/ilias/__init__.py b/PFERD/crawlers/ilias/__init__.py new file mode 100644 index 0000000..15b8d5d --- /dev/null +++ b/PFERD/crawlers/ilias/__init__.py @@ -0,0 +1,3 @@ +from .kit_web_ilias_crawler import KitIliasCrawler, KitIliasCrawlerSection + +__all__ = ["KitIliasCrawler", "KitIliasCrawlerSection"] diff --git a/PFERD/crawlers/ilias/kit_ilias_html.py b/PFERD/crawlers/ilias/kit_ilias_html.py new file mode 100644 index 0000000..17eb855 --- /dev/null +++ b/PFERD/crawlers/ilias/kit_ilias_html.py @@ -0,0 +1,452 @@ +import json +import re +from dataclasses import dataclass +from datetime import date, datetime, timedelta +from enum import Enum +# TODO In Python 3.9 and above, AsyncContextManager is deprecated +from typing import List, Optional, Union +from urllib.parse import urljoin, urlparse + +from bs4 import BeautifulSoup, Tag + +from PFERD.utils import url_set_query_params + +TargetType = Union[str, int] + + +class IliasElementType(Enum): + EXERCISE = "exercise" + FILE = "file" + FOLDER = "folder" + FORUM = "forum" + LINK = "link" + MEETING = "meeting" + VIDEO = "video" + VIDEO_PLAYER = "video_player" + VIDEO_FOLDER = "video_folder" + VIDEO_FOLDER_MAYBE_PAGINATED = "video_folder_maybe_paginated" + + +@dataclass +class IliasPageElement: + type: IliasElementType + url: str + name: str + mtime: Optional[datetime] = None + description: Optional[str] = None + + +class IliasPage: + + def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]): + self._soup = soup + self._page_url = _page_url + self._page_type = source_element.type if source_element else None + self._source_name = source_element.name if source_element else "" + + def get_child_elements(self) -> List[IliasPageElement]: + """ + Return all child page elements you can find here. + """ + if self._is_video_player(): + return self._player_to_video() + if self._is_video_listing(): + return self._find_video_entries() + if self._is_exercise_file(): + return self._find_exercise_entries() + return self._find_normal_entries() + + def _is_video_player(self) -> bool: + return "paella_config_file" in str(self._soup) + + def _is_video_listing(self) -> bool: + # ILIAS fluff around it + if self._soup.find(id="headerimage"): + element: Tag = self._soup.find(id="headerimage") + if "opencast" in element.attrs["src"].lower(): + return True + + # Raw listing without ILIAS fluff + video_element_table: Tag = self._soup.find( + name="table", id=re.compile(r"tbl_xoct_.+") + ) + return video_element_table is not None + + def _is_exercise_file(self) -> bool: + # we know it from before + if self._page_type == IliasElementType.EXERCISE: + return True + + # We have no suitable parent - let's guesss + if self._soup.find(id="headerimage"): + element: Tag = self._soup.find(id="headerimage") + if "exc" in element.attrs["src"].lower(): + return True + + return False + + def _player_to_video(self) -> List[IliasPageElement]: + # Fetch the actual video page. This is a small wrapper page initializing a javscript + # player. Sadly we can not execute that JS. The actual video stream url is nowhere + # on the page, but defined in a JS object inside a script tag, passed to the player + # library. + # We do the impossible and RegEx the stream JSON object out of the page's HTML source + regex: re.Pattern[str] = re.compile( + r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE + ) + json_match = regex.search(str(self._soup)) + + if json_match is None: + print(f"Could not find json stream info for {self._page_url!r}") + return [] + json_str = json_match.group(1) + + # parse it + json_object = json.loads(json_str) + # and fetch the video url! + video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"] + return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)] + + def _find_video_entries(self) -> List[IliasPageElement]: + # ILIAS has three stages for video pages + # 1. The initial dummy page without any videos. This page contains the link to the listing + # 2. The video listing which might be paginated + # 3. An unpaginated video listing (or at least one that includes 800 videos) + # + # We need to figure out where we are. + + video_element_table: Tag = self._soup.find( + name="table", id=re.compile(r"tbl_xoct_.+") + ) + + if video_element_table is None: + # We are in stage 1 + # The page is actually emtpy but contains the link to stage 2 + content_link: Tag = self._soup.select_one("#tab_series a") + url: str = self._abs_url_from_link(content_link) + query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} + url = url_set_query_params(url, query_params) + return [IliasPageElement(IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED, url, "")] + + is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None + + if is_paginated and not self._page_type == IliasElementType.VIDEO_FOLDER: + # We are in stage 2 - try to break pagination + return self._find_video_entries_paginated() + + return self._find_video_entries_no_paging() + + def _find_video_entries_paginated(self) -> List[IliasPageElement]: + table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) + + if table_element is None: + # TODO: Properly log this + print( + "Could not increase elements per page (table not found)." + " Some might not be crawled!" + ) + return self._find_video_entries_no_paging() + + id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"]) + if id_match is None: + # TODO: Properly log this + print( + "Could not increase elements per page (table id not found)." + " Some might not be crawled!" + ) + return self._find_video_entries_no_paging() + + table_id = id_match.group(1) + + query_params = {f"tbl_xoct_{table_id}_trows": "800", + "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} + url = url_set_query_params(self._page_url, query_params) + return [IliasPageElement(IliasElementType.VIDEO_FOLDER, url, "")] + + def _find_video_entries_no_paging(self) -> List[IliasPageElement]: + """ + Crawls the "second stage" video page. This page contains the actual video urls. + """ + # Video start links are marked with an "Abspielen" link + video_links: List[Tag] = self._soup.findAll( + name="a", text=re.compile(r"\s*Abspielen\s*") + ) + + results: List[IliasPageElement] = [] + + # TODO: Sadly the download button is currently broken, so never do that + for link in video_links: + results.append(self._listed_video_to_element(link)) + + return results + + def _listed_video_to_element(self, link: Tag) -> IliasPageElement: + # The link is part of a table with multiple columns, describing metadata. + # 6th child (1 indexed) is the modification time string + modification_string = link.parent.parent.parent.select_one( + "td.std:nth-child(6)" + ).getText().strip() + modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M") + + title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip() + title += ".mp4" + + video_name: str = _sanitize_path_name(title) + + video_url = self._abs_url_from_link(link) + + return IliasPageElement(IliasElementType.VIDEO_PLAYER, video_url, video_name, modification_time) + + def _find_exercise_entries(self) -> List[IliasPageElement]: + results: List[IliasPageElement] = [] + + # Each assignment is in an accordion container + assignment_containers: List[Tag] = self._soup.select(".il_VAccordionInnerContainer") + + for container in assignment_containers: + # Fetch the container name out of the header to use it in the path + container_name = container.select_one(".ilAssignmentHeader").getText().strip() + # Find all download links in the container (this will contain all the files) + files: List[Tag] = container.findAll( + name="a", + # download links contain the given command class + attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x}, + text="Download" + ) + + # Grab each file as you now have the link + for file_link in files: + # Two divs, side by side. Left is the name, right is the link ==> get left + # sibling + file_name = file_link.parent.findPrevious(name="div").getText().strip() + file_name = _sanitize_path_name(file_name) + url = self._abs_url_from_link(file_link) + + results.append(IliasPageElement( + IliasElementType.FILE, + url, + container_name + "/" + file_name, + None # We do not have any timestamp + )) + + return results + + def _find_normal_entries(self) -> List[IliasPageElement]: + result: List[IliasPageElement] = [] + + # Fetch all links and throw them to the general interpreter + links: List[Tag] = self._soup.select("a.il_ContainerItemTitle") + + for link in links: + abs_url = self._abs_url_from_link(link) + element_name = _sanitize_path_name(link.getText()) + element_type = self._find_type_from_link(element_name, link, abs_url) + description = self._find_link_description(link) + + if not element_type: + continue + if element_type == IliasElementType.MEETING: + element_name = _sanitize_path_name(self._normalize_meeting_name(element_name)) + elif element_type == IliasElementType.FILE: + result.append(self._file_to_element(element_name, abs_url, link)) + continue + + result.append(IliasPageElement(element_type, abs_url, element_name, description=description)) + + return result + + def _find_link_description(self, link: Tag) -> Optional[str]: + tile: Tag = link.findParent("div", {"class": lambda x: x and "il_ContainerListItem" in x}) + if not tile: + return None + description_element: Tag = tile.find("div", {"class": lambda x: x and "il_Description" in x}) + if not description_element: + return None + return description_element.getText().strip() + + def _file_to_element(self, name: str, url: str, link_element: Tag) -> IliasPageElement: + # Files have a list of properties (type, modification date, size, etc.) + # In a series of divs. + # Find the parent containing all those divs, so we can filter our what we need + properties_parent: Tag = link_element.findParent( + "div", {"class": lambda x: "il_ContainerListItem" in x} + ).select_one(".il_ItemProperties") + # The first one is always the filetype + file_type = properties_parent.select_one("span.il_ItemProperty").getText().strip() + + # The rest does not have a stable order. Grab the whole text and reg-ex the date + # out of it + all_properties_text = properties_parent.getText().strip() + modification_date_match = re.search( + r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)", + all_properties_text + ) + if modification_date_match is None: + modification_date = None + # TODO: Properly log this + print(f"Could not extract start date from {all_properties_text!r}") + else: + modification_date_str = modification_date_match.group(1) + modification_date = demangle_date(modification_date_str) + + # Grab the name from the link text + name = _sanitize_path_name(link_element.getText()) + full_path = name + "." + file_type + + return IliasPageElement(IliasElementType.FILE, url, full_path, modification_date) + + @staticmethod + def _find_type_from_link( + element_name: str, + link_element: Tag, + url: str + ) -> Optional[IliasElementType]: + """ + Decides which sub crawler to use for a given top level element. + """ + parsed_url = urlparse(url) + + # file URLs contain "target=file" + if "target=file_" in parsed_url.query: + return IliasElementType.FILE + + # Skip forums + if "cmd=showThreads" in parsed_url.query: + return IliasElementType.FORUM + + # Everything with a ref_id can *probably* be opened to reveal nested things + # video groups, directories, exercises, etc + if "ref_id=" in parsed_url.query: + return IliasPage._find_type_from_folder_like(link_element, url) + + # TODO: Log this properly + print(f"Unknown type: The element was at {str(element_name)!r} and it is {link_element!r})") + return None + + @staticmethod + def _find_type_from_folder_like(link_element: Tag, url: str) -> Optional[IliasElementType]: + """ + Try crawling something that looks like a folder. + """ + # pylint: disable=too-many-return-statements + + found_parent: Optional[Tag] = None + + # We look for the outer div of our inner link, to find information around it + # (mostly the icon) + for parent in link_element.parents: + if "ilContainerListItemOuter" in parent["class"]: + found_parent = parent + break + + if found_parent is None: + # TODO: Log this properly + print(f"Could not find element icon for {url!r}") + return None + + # Find the small descriptive icon to figure out the type + img_tag: Optional[Tag] = found_parent.select_one("img.ilListItemIcon") + + if img_tag is None: + # TODO: Log this properly + print(f"Could not find image tag for {url!r}") + return None + + if "opencast" in str(img_tag["alt"]).lower(): + return IliasElementType.VIDEO_FOLDER + + if str(img_tag["src"]).endswith("icon_exc.svg"): + return IliasElementType.EXERCISE + + if str(img_tag["src"]).endswith("icon_webr.svg"): + return IliasElementType.LINK + + if str(img_tag["src"]).endswith("frm.svg"): + return IliasElementType.FORUM + + if str(img_tag["src"]).endswith("sess.svg"): + return IliasElementType.MEETING + + return IliasElementType.FOLDER + + @staticmethod + def _normalize_meeting_name(meeting_name: str) -> str: + """ + Normalizes meeting names, which have a relative time as their first part, + to their date in ISO format. + """ + date_portion_str = meeting_name.split(" - ")[0] + date_portion = demangle_date(date_portion_str) + + if not date_portion: + return meeting_name + + rest_of_name = meeting_name + if rest_of_name.startswith(date_portion_str): + rest_of_name = rest_of_name[len(date_portion_str):] + + return datetime.strftime(date_portion, "%Y-%m-%d, %H:%M") + rest_of_name + + def _abs_url_from_link(self, link_tag: Tag) -> str: + """ + Create an absolute url from an tag. + """ + return urljoin(self._page_url, link_tag.get("href")) + + +german_months = ['Jan', 'Feb', 'Mär', 'Apr', 'Mai', 'Jun', 'Jul', 'Aug', 'Sep', 'Okt', 'Nov', 'Dez'] +english_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] + + +def demangle_date(date_str: str) -> Optional[datetime]: + """ + Demangle a given date in one of the following formats: + "Gestern, HH:MM" + "Heute, HH:MM" + "Morgen, HH:MM" + "dd. mon yyyy, HH:MM + """ + try: + date_str = re.sub(r"\s+", " ", date_str) + date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I) + date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I) + date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I) + for german, english in zip(german_months, english_months): + date_str = date_str.replace(german, english) + # Remove trailing dots for abbreviations, e.g. "20. Apr. 2020" -> "20. Apr 2020" + date_str = date_str.replace(english + ".", english) + + # We now have a nice english String in the format: "dd. mmm yyyy, hh:mm" + day_part, time_part = date_str.split(",") + day_str, month_str, year_str = day_part.split(" ") + + day = int(day_str.strip().replace(".", "")) + month = english_months.index(month_str.strip()) + 1 + year = int(year_str.strip()) + + hour_str, minute_str = time_part.split(":") + hour = int(hour_str) + minute = int(minute_str) + + return datetime(year, month, day, hour, minute) + except Exception: + # TODO: Properly log this + print(f"Could not parse date {date_str!r}") + return None + + +def _format_date_english(date_to_format: date) -> str: + month = english_months[date_to_format.month - 1] + return f"{date_to_format.day:02d}. {month} {date_to_format.year:04d}" + + +def _yesterday() -> date: + return date.today() - timedelta(days=1) + + +def _tomorrow() -> date: + return date.today() + timedelta(days=1) + + +def _sanitize_path_name(name: str) -> str: + return name.replace("/", "-").replace("\\", "-").strip() diff --git a/PFERD/crawlers/ilias.py b/PFERD/crawlers/ilias/kit_web_ilias_crawler.py similarity index 51% rename from PFERD/crawlers/ilias.py rename to PFERD/crawlers/ilias/kit_web_ilias_crawler.py index be3584c..be613e6 100644 --- a/PFERD/crawlers/ilias.py +++ b/PFERD/crawlers/ilias/kit_web_ilias_crawler.py @@ -1,23 +1,19 @@ import asyncio -import json import re -from dataclasses import dataclass -from datetime import date, datetime, timedelta -from enum import Enum from pathlib import PurePath # TODO In Python 3.9 and above, AsyncContextManager is deprecated -from typing import Any, Dict, List, Optional, Set, Union -from urllib.parse import parse_qs, urlencode, urljoin, urlparse, urlsplit, urlunsplit +from typing import Any, Dict, Optional, Set, Union import aiohttp from bs4 import BeautifulSoup, Tag +from PFERD.authenticators import Authenticator +from PFERD.config import Config +from PFERD.crawler import CrawlerSection, HttpCrawler, anoncritical, arepeat from PFERD.output_dir import Redownload -from PFERD.utils import soupify +from PFERD.utils import soupify, url_set_query_param -from ..authenticators import Authenticator -from ..config import Config -from ..crawler import CrawlerSection, HttpCrawler, anoncritical, arepeat +from .kit_ilias_html import IliasElementType, IliasPage, IliasPageElement TargetType = Union[str, int] @@ -58,465 +54,6 @@ class KitIliasCrawlerSection(CrawlerSection): return self.s.getboolean("link_file_plain_text", fallback=False) -class IliasElementType(Enum): - EXERCISE = "exercise" - FILE = "file" - FOLDER = "folder" - FORUM = "forum" - LINK = "link" - MEETING = "meeting" - VIDEO = "video" - VIDEO_PLAYER = "video_player" - VIDEO_FOLDER = "video_folder" - VIDEO_FOLDER_MAYBE_PAGINATED = "video_folder_maybe_paginated" - - -@dataclass -class IliasPageElement: - type: IliasElementType - url: str - name: str - mtime: Optional[datetime] = None - description: Optional[str] = None - - -class IliasPage: - - def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]): - self._soup = soup - self._page_url = _page_url - self._page_type = source_element.type if source_element else None - self._source_name = source_element.name if source_element else "" - - def get_child_elements(self) -> List[IliasPageElement]: - """ - Return all child page elements you can find here. - """ - if self._is_video_player(): - return self._player_to_video() - if self._is_video_listing(): - return self._find_video_entries() - if self._is_exercise_file(): - return self._find_exercise_entries() - return self._find_normal_entries() - - def _is_video_player(self) -> bool: - return "paella_config_file" in str(self._soup) - - def _is_video_listing(self) -> bool: - # ILIAS fluff around it - if self._soup.find(id="headerimage"): - element: Tag = self._soup.find(id="headerimage") - if "opencast" in element.attrs["src"].lower(): - return True - - # Raw listing without ILIAS fluff - video_element_table: Tag = self._soup.find( - name="table", id=re.compile(r"tbl_xoct_.+") - ) - return video_element_table is not None - - def _is_exercise_file(self) -> bool: - # we know it from before - if self._page_type == IliasElementType.EXERCISE: - return True - - # We have no suitable parent - let's guesss - if self._soup.find(id="headerimage"): - element: Tag = self._soup.find(id="headerimage") - if "exc" in element.attrs["src"].lower(): - return True - - return False - - def _player_to_video(self) -> List[IliasPageElement]: - # Fetch the actual video page. This is a small wrapper page initializing a javscript - # player. Sadly we can not execute that JS. The actual video stream url is nowhere - # on the page, but defined in a JS object inside a script tag, passed to the player - # library. - # We do the impossible and RegEx the stream JSON object out of the page's HTML source - regex: re.Pattern[str] = re.compile( - r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE - ) - json_match = regex.search(str(self._soup)) - - if json_match is None: - print(f"Could not find json stream info for {self._page_url!r}") - return [] - json_str = json_match.group(1) - - # parse it - json_object = json.loads(json_str) - # and fetch the video url! - video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"] - return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)] - - def _find_video_entries(self) -> List[IliasPageElement]: - # ILIAS has three stages for video pages - # 1. The initial dummy page without any videos. This page contains the link to the listing - # 2. The video listing which might be paginated - # 3. An unpaginated video listing (or at least one that includes 800 videos) - # - # We need to figure out where we are. - - video_element_table: Tag = self._soup.find( - name="table", id=re.compile(r"tbl_xoct_.+") - ) - - if video_element_table is None: - # We are in stage 1 - # The page is actually emtpy but contains the link to stage 2 - content_link: Tag = self._soup.select_one("#tab_series a") - url: str = self._abs_url_from_link(content_link) - query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} - url = _url_set_query_params(url, query_params) - return [IliasPageElement(IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED, url, "")] - - is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None - - if is_paginated and not self._page_type == IliasElementType.VIDEO_FOLDER: - # We are in stage 2 - try to break pagination - return self._find_video_entries_paginated() - - return self._find_video_entries_no_paging() - - def _find_video_entries_paginated(self) -> List[IliasPageElement]: - table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) - - if table_element is None: - # TODO: Properly log this - print( - "Could not increase elements per page (table not found)." - " Some might not be crawled!" - ) - return self._find_video_entries_no_paging() - - id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"]) - if id_match is None: - # TODO: Properly log this - print( - "Could not increase elements per page (table id not found)." - " Some might not be crawled!" - ) - return self._find_video_entries_no_paging() - - table_id = id_match.group(1) - - query_params = {f"tbl_xoct_{table_id}_trows": "800", - "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} - url = _url_set_query_params(self._page_url, query_params) - return [IliasPageElement(IliasElementType.VIDEO_FOLDER, url, "")] - - def _find_video_entries_no_paging(self) -> List[IliasPageElement]: - """ - Crawls the "second stage" video page. This page contains the actual video urls. - """ - # Video start links are marked with an "Abspielen" link - video_links: List[Tag] = self._soup.findAll( - name="a", text=re.compile(r"\s*Abspielen\s*") - ) - - results: List[IliasPageElement] = [] - - # TODO: Sadly the download button is currently broken, so never do that - for link in video_links: - results.append(self._listed_video_to_element(link)) - - return results - - def _listed_video_to_element(self, link: Tag) -> IliasPageElement: - # The link is part of a table with multiple columns, describing metadata. - # 6th child (1 indexed) is the modification time string - modification_string = link.parent.parent.parent.select_one( - "td.std:nth-child(6)" - ).getText().strip() - modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M") - - title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip() - title += ".mp4" - - video_name: str = _sanitize_path_name(title) - - video_url = self._abs_url_from_link(link) - - return IliasPageElement(IliasElementType.VIDEO_PLAYER, video_url, video_name, modification_time) - - def _find_exercise_entries(self) -> List[IliasPageElement]: - results: List[IliasPageElement] = [] - - # Each assignment is in an accordion container - assignment_containers: List[Tag] = self._soup.select(".il_VAccordionInnerContainer") - - for container in assignment_containers: - # Fetch the container name out of the header to use it in the path - container_name = container.select_one(".ilAssignmentHeader").getText().strip() - # Find all download links in the container (this will contain all the files) - files: List[Tag] = container.findAll( - name="a", - # download links contain the given command class - attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x}, - text="Download" - ) - - # Grab each file as you now have the link - for file_link in files: - # Two divs, side by side. Left is the name, right is the link ==> get left - # sibling - file_name = file_link.parent.findPrevious(name="div").getText().strip() - file_name = _sanitize_path_name(file_name) - url = self._abs_url_from_link(file_link) - - results.append(IliasPageElement( - IliasElementType.FILE, - url, - container_name + "/" + file_name, - None # We do not have any timestamp - )) - - return results - - def _find_normal_entries(self) -> List[IliasPageElement]: - result: List[IliasPageElement] = [] - - # Fetch all links and throw them to the general interpreter - links: List[Tag] = self._soup.select("a.il_ContainerItemTitle") - - for link in links: - abs_url = self._abs_url_from_link(link) - element_name = _sanitize_path_name(link.getText()) - element_type = self._find_type_from_link(element_name, link, abs_url) - description = self._find_link_description(link) - - if not element_type: - continue - if element_type == IliasElementType.MEETING: - element_name = _sanitize_path_name(self._normalize_meeting_name(element_name)) - elif element_type == IliasElementType.FILE: - result.append(self._file_to_element(element_name, abs_url, link)) - continue - - result.append(IliasPageElement(element_type, abs_url, element_name, description=description)) - - return result - - def _find_link_description(self, link: Tag) -> Optional[str]: - tile: Tag = link.findParent("div", {"class": lambda x: x and "il_ContainerListItem" in x}) - if not tile: - return None - description_element: Tag = tile.find("div", {"class": lambda x: x and "il_Description" in x}) - if not description_element: - return None - return description_element.getText().strip() - - def _file_to_element(self, name: str, url: str, link_element: Tag) -> IliasPageElement: - # Files have a list of properties (type, modification date, size, etc.) - # In a series of divs. - # Find the parent containing all those divs, so we can filter our what we need - properties_parent: Tag = link_element.findParent( - "div", {"class": lambda x: "il_ContainerListItem" in x} - ).select_one(".il_ItemProperties") - # The first one is always the filetype - file_type = properties_parent.select_one("span.il_ItemProperty").getText().strip() - - # The rest does not have a stable order. Grab the whole text and reg-ex the date - # out of it - all_properties_text = properties_parent.getText().strip() - modification_date_match = re.search( - r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)", - all_properties_text - ) - if modification_date_match is None: - modification_date = None - # TODO: Properly log this - print(f"Could not extract start date from {all_properties_text!r}") - else: - modification_date_str = modification_date_match.group(1) - modification_date = demangle_date(modification_date_str) - - # Grab the name from the link text - name = _sanitize_path_name(link_element.getText()) - full_path = name + "." + file_type - - return IliasPageElement(IliasElementType.FILE, url, full_path, modification_date) - - @staticmethod - def _find_type_from_link( - element_name: str, - link_element: Tag, - url: str - ) -> Optional[IliasElementType]: - """ - Decides which sub crawler to use for a given top level element. - """ - parsed_url = urlparse(url) - - # file URLs contain "target=file" - if "target=file_" in parsed_url.query: - return IliasElementType.FILE - - # Skip forums - if "cmd=showThreads" in parsed_url.query: - return IliasElementType.FORUM - - # Everything with a ref_id can *probably* be opened to reveal nested things - # video groups, directories, exercises, etc - if "ref_id=" in parsed_url.query: - return IliasPage._find_type_from_folder_like(link_element, url) - - # TODO: Log this properly - print(f"Unknown type: The element was at {str(element_name)!r} and it is {link_element!r})") - return None - - @staticmethod - def _find_type_from_folder_like(link_element: Tag, url: str) -> Optional[IliasElementType]: - """ - Try crawling something that looks like a folder. - """ - # pylint: disable=too-many-return-statements - - found_parent: Optional[Tag] = None - - # We look for the outer div of our inner link, to find information around it - # (mostly the icon) - for parent in link_element.parents: - if "ilContainerListItemOuter" in parent["class"]: - found_parent = parent - break - - if found_parent is None: - # TODO: Log this properly - print(f"Could not find element icon for {url!r}") - return None - - # Find the small descriptive icon to figure out the type - img_tag: Optional[Tag] = found_parent.select_one("img.ilListItemIcon") - - if img_tag is None: - # TODO: Log this properly - print(f"Could not find image tag for {url!r}") - return None - - if "opencast" in str(img_tag["alt"]).lower(): - return IliasElementType.VIDEO_FOLDER - - if str(img_tag["src"]).endswith("icon_exc.svg"): - return IliasElementType.EXERCISE - - if str(img_tag["src"]).endswith("icon_webr.svg"): - return IliasElementType.LINK - - if str(img_tag["src"]).endswith("frm.svg"): - return IliasElementType.FORUM - - if str(img_tag["src"]).endswith("sess.svg"): - return IliasElementType.MEETING - - return IliasElementType.FOLDER - - @staticmethod - def _normalize_meeting_name(meeting_name: str) -> str: - """ - Normalizes meeting names, which have a relative time as their first part, - to their date in ISO format. - """ - date_portion_str = meeting_name.split(" - ")[0] - date_portion = demangle_date(date_portion_str) - - if not date_portion: - return meeting_name - - rest_of_name = meeting_name - if rest_of_name.startswith(date_portion_str): - rest_of_name = rest_of_name[len(date_portion_str):] - - return datetime.strftime(date_portion, "%Y-%m-%d, %H:%M") + rest_of_name - - def _abs_url_from_link(self, link_tag: Tag) -> str: - """ - Create an absolute url from an tag. - """ - return urljoin(self._page_url, link_tag.get("href")) - - -german_months = ['Jan', 'Feb', 'Mär', 'Apr', 'Mai', 'Jun', 'Jul', 'Aug', 'Sep', 'Okt', 'Nov', 'Dez'] -english_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] - - -def demangle_date(date_str: str) -> Optional[datetime]: - """ - Demangle a given date in one of the following formats: - "Gestern, HH:MM" - "Heute, HH:MM" - "Morgen, HH:MM" - "dd. mon yyyy, HH:MM - """ - try: - date_str = re.sub(r"\s+", " ", date_str) - date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I) - date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I) - date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I) - for german, english in zip(german_months, english_months): - date_str = date_str.replace(german, english) - # Remove trailing dots for abbreviations, e.g. "20. Apr. 2020" -> "20. Apr 2020" - date_str = date_str.replace(english + ".", english) - - # We now have a nice english String in the format: "dd. mmm yyyy, hh:mm" - day_part, time_part = date_str.split(",") - day_str, month_str, year_str = day_part.split(" ") - - day = int(day_str.strip().replace(".", "")) - month = english_months.index(month_str.strip()) + 1 - year = int(year_str.strip()) - - hour_str, minute_str = time_part.split(":") - hour = int(hour_str) - minute = int(minute_str) - - return datetime(year, month, day, hour, minute) - except Exception: - # TODO: Properly log this - print(f"Could not parse date {date_str!r}") - return None - - -def _format_date_english(date_to_format: date) -> str: - month = english_months[date_to_format.month - 1] - return f"{date_to_format.day:02d}. {month} {date_to_format.year:04d}" - - -def _yesterday() -> date: - return date.today() - timedelta(days=1) - - -def _tomorrow() -> date: - return date.today() + timedelta(days=1) - - -def _sanitize_path_name(name: str) -> str: - return name.replace("/", "-").replace("\\", "-").strip() - - -def _url_set_query_param(url: str, param: str, value: str) -> str: - """ - Set a query parameter in an url, overwriting existing ones with the same name. - """ - scheme, netloc, path, query, fragment = urlsplit(url) - query_parameters = parse_qs(query) - query_parameters[param] = [value] - new_query_string = urlencode(query_parameters, doseq=True) - - return urlunsplit((scheme, netloc, path, new_query_string, fragment)) - - -def _url_set_query_params(url: str, params: Dict[str, str]) -> str: - result = url - - for key, val in params.items(): - result = _url_set_query_param(result, key, val) - - return result - - _DIRECTORY_PAGES: Set[IliasElementType] = set([ IliasElementType.EXERCISE, IliasElementType.FOLDER, @@ -559,7 +96,7 @@ class KitIliasCrawler(HttpCrawler): async def _crawl_course(self, course_id: int) -> None: # Start crawling at the given course - root_url = _url_set_query_param( + root_url = url_set_query_param( self._base_url + "/goto.php", "target", f"crs_{course_id}" ) diff --git a/PFERD/utils.py b/PFERD/utils.py index 0b3d40d..56d6f53 100644 --- a/PFERD/utils.py +++ b/PFERD/utils.py @@ -6,7 +6,8 @@ import sys from abc import ABC, abstractmethod from contextlib import AsyncExitStack from types import TracebackType -from typing import Any, Callable, Generic, Optional, Type, TypeVar +from typing import Any, Callable, Dict, Generic, Optional, Type, TypeVar +from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit import bs4 @@ -38,6 +39,30 @@ def soupify(data: bytes) -> bs4.BeautifulSoup: return bs4.BeautifulSoup(data, "html.parser") +def url_set_query_param(url: str, param: str, value: str) -> str: + """ + Set a query parameter in an url, overwriting existing ones with the same name. + """ + scheme, netloc, path, query, fragment = urlsplit(url) + query_parameters = parse_qs(query) + query_parameters[param] = [value] + new_query_string = urlencode(query_parameters, doseq=True) + + return urlunsplit((scheme, netloc, path, new_query_string, fragment)) + + +def url_set_query_params(url: str, params: Dict[str, str]) -> str: + """ + Sets multiple query parameters in an url, overwriting existing ones. + """ + result = url + + for key, val in params.items(): + result = url_set_query_param(result, key, val) + + return result + + async def prompt_yes_no(query: str, default: Optional[bool]) -> bool: """ Asks the user a yes/no question and returns their choice.