diff --git a/PFERD/crawlers/ilias.py b/PFERD/crawlers/ilias.py index 2352945..2d9a9c9 100644 --- a/PFERD/crawlers/ilias.py +++ b/PFERD/crawlers/ilias.py @@ -1,9 +1,15 @@ +import json +import re from configparser import SectionProxy +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum from pathlib import PurePath -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional +from urllib.parse import urljoin, urlparse import aiohttp -from bs4 import BeautifulSoup +from bs4 import BeautifulSoup, Tag from PFERD.utils import soupify from ..authenticators import Authenticator @@ -41,6 +47,330 @@ class IliasCrawlerSection(CrawlerSection): return auth +class IliasElementType(Enum): + EXERCISE = "exercise" + FILE = "file" + FOLDER = "folder" + FORUM = "forum" + LINK = "link" + MEETING = "meeting" + VIDEO = "video" + VIDEO_PLAYER = "video_player" + VIDEO_FOLDER = "video_folder" + VIDEO_FOLDER_MAYBE_PAGINATED = "video_folder_maybe_paginated" + + +@dataclass +class IliasPageElement: + type: IliasElementType + url: str + name: str + mtime: Optional[datetime] = None + query_parameter: Dict[str, str] = field(default_factory=dict) + + +class IliasPage: + + def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]): + self._soup = soup + self._page_url = _page_url + self._page_type = source_element.type if source_element else None + self._source_name = source_element.name if source_element else "" + + def get_child_elements(self) -> List[IliasPageElement]: + """ + Return all child page elements you can find here. + """ + if self._is_video_player(): + return self._player_to_video() + if self._is_video_listing(): + return self._find_video_entries() + return self._find_normal_entries() + + def _is_video_player(self) -> bool: + return "paella_config_file" in str(self._soup) + + def _is_video_listing(self) -> bool: + if self._soup.find(id="headerimage"): + element: Tag = self._soup.find(id="headerimage") + if "opencast" in element.attrs["src"].lower(): + return True + return False + + def _player_to_video(self) -> List[IliasPageElement]: + # Fetch the actual video page. This is a small wrapper page initializing a javscript + # player. Sadly we can not execute that JS. The actual video stream url is nowhere + # on the page, but defined in a JS object inside a script tag, passed to the player + # library. + # We do the impossible and RegEx the stream JSON object out of the page's HTML source + regex: re.Pattern[str] = re.compile( + r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE + ) + json_match = regex.search(str(self._soup)) + + if json_match is None: + print(f"Could not find json stream info for {self._page_url!r}") + return [] + json_str = json_match.group(1) + + # parse it + json_object = json.loads(json_str) + # and fetch the video url! + video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"] + return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)] + + def _find_video_entries(self) -> List[IliasPageElement]: + # ILIAS has three stages for video pages + # 1. The initial dummy page without any videos. This page contains the link to the listing + # 2. The video listing which might be paginated + # 3. An unpaginated video listing (or at least one that includes 800 videos) + # + # We need to figure out where we are. + + video_element_table: Tag = self._soup.find( + name="table", id=re.compile(r"tbl_xoct_.+") + ) + + if video_element_table is None: + # We are in stage 1 + # The page is actually emtpy but contains the link to stage 2 + content_link: Tag = self._soup.select_one("#tab_series a") + url: str = self._abs_url_from_link(content_link) + query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} + return [IliasPageElement( + IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED, url, "", query_parameter=query_params + )] + + is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None + + if is_paginated and not self._page_type == IliasElementType.VIDEO_FOLDER: + # We are in stage 2 - try to break pagination + return self._find_video_entries_paginated() + + return self._find_video_entries_no_paging() + + def _find_video_entries_paginated(self) -> List[IliasPageElement]: + table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) + + if table_element is None: + # TODO: Properly log this + print( + "Could not increase elements per page (table not found)." + " Some might not be crawled!" + ) + return self._find_video_entries_no_paging() + + id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"]) + if id_match is None: + # TODO: Properly log this + print( + "Could not increase elements per page (table id not found)." + " Some might not be crawled!" + ) + return self._find_video_entries_no_paging() + + table_id = id_match.group(1) + + query_params = {f"tbl_xoct_{table_id}_trows": "800", + "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} + return [IliasPageElement( + IliasElementType.VIDEO_FOLDER, self._page_url, "", query_parameter=query_params + )] + + def _find_video_entries_no_paging(self) -> List[IliasPageElement]: + """ + Crawls the "second stage" video page. This page contains the actual video urls. + """ + # Video start links are marked with an "Abspielen" link + video_links: List[Tag] = self._soup.findAll( + name="a", text=re.compile(r"\s*Abspielen\s*") + ) + + results: List[IliasPageElement] = [] + + # TODO: Sadly the download button is currently broken, so never do that + for link in video_links: + results.append(self._listed_video_to_element(link)) + + return results + + def _listed_video_to_element(self, link: Tag) -> IliasPageElement: + # The link is part of a table with multiple columns, describing metadata. + # 6th child (1 indexed) is the modification time string + modification_string = link.parent.parent.parent.select_one( + "td.std:nth-child(6)" + ).getText().strip() + modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M") + + title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip() + title += ".mp4" + + video_name: str = _sanitize_path_name(title) + + video_url = self._abs_url_from_link(link) + + return IliasPageElement(IliasElementType.VIDEO_PLAYER, video_url, video_name, modification_time) + + def _find_normal_entries(self) -> List[IliasPageElement]: + result: List[IliasPageElement] = [] + + # Fetch all links and throw them to the general interpreter + links: List[Tag] = self._soup.select("a.il_ContainerItemTitle") + + for link in links: + abs_url = self._abs_url_from_link(link) + element_name = _sanitize_path_name(link.getText()) + element_type = self._find_type_from_link(element_name, link, abs_url) + + if not element_type: + continue + elif element_type == IliasElementType.MEETING: + element_path = _sanitize_path_name(self._normalize_meeting_name(element_name)) + elif element_type == IliasElementType.FILE: + result.append(self._file_to_element(element_name, abs_url, link)) + continue + + result.append(IliasPageElement(element_type, abs_url, element_name, None)) + + return result + + def _file_to_element(self, name: str, url: str, link_element: Tag) -> IliasPageElement: + # Files have a list of properties (type, modification date, size, etc.) + # In a series of divs. + # Find the parent containing all those divs, so we can filter our what we need + properties_parent: Tag = link_element.findParent( + "div", {"class": lambda x: "il_ContainerListItem" in x} + ).select_one(".il_ItemProperties") + # The first one is always the filetype + file_type = properties_parent.select_one("span.il_ItemProperty").getText().strip() + + # The rest does not have a stable order. Grab the whole text and reg-ex the date + # out of it + all_properties_text = properties_parent.getText().strip() + modification_date_match = re.search( + r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)", + all_properties_text + ) + if modification_date_match is None: + modification_date = None + # TODO: Properly log this + print(f"Could not extract start date from {all_properties_text!r}") + else: + modification_date_str = modification_date_match.group(1) + modification_date = demangle_date(modification_date_str) + + # Grab the name from the link text + name = _sanitize_path_name(link_element.getText()) + full_path = name + "." + file_type + + return IliasPageElement(IliasElementType.FILE, url, full_path, modification_date) + + @staticmethod + def _find_type_from_link( + element_name: str, + link_element: Tag, + url: str + ) -> Optional[IliasElementType]: + """ + Decides which sub crawler to use for a given top level element. + """ + parsed_url = urlparse(url) + + # file URLs contain "target=file" + if "target=file_" in parsed_url.query: + return IliasElementType.FILE + + # Skip forums + if "cmd=showThreads" in parsed_url.query: + return IliasElementType.FORUM + + # Everything with a ref_id can *probably* be opened to reveal nested things + # video groups, directories, exercises, etc + if "ref_id=" in parsed_url.query: + return IliasPage._find_type_from_folder_like(link_element, url) + + # TODO: Log this properly + print(f"Unknown type: The element was at {str(element_name)!r} and it is {link_element!r})") + return None + + @staticmethod + def _find_type_from_folder_like(link_element: Tag, url: str) -> Optional[IliasElementType]: + """ + Try crawling something that looks like a folder. + """ + # pylint: disable=too-many-return-statements + + found_parent: Optional[Tag] = None + + # We look for the outer div of our inner link, to find information around it + # (mostly the icon) + for parent in link_element.parents: + if "ilContainerListItemOuter" in parent["class"]: + found_parent = parent + break + + if found_parent is None: + # TODO: Log this properly + print(f"Could not find element icon for {url!r}") + return None + + # Find the small descriptive icon to figure out the type + img_tag: Optional[Tag] = found_parent.select_one("img.ilListItemIcon") + + if img_tag is None: + # TODO: Log this properly + print(f"Could not find image tag for {url!r}") + return None + + if "opencast" in str(img_tag["alt"]).lower(): + return IliasElementType.VIDEO_FOLDER + + if str(img_tag["src"]).endswith("icon_exc.svg"): + return IliasElementType.EXERCISE + + if str(img_tag["src"]).endswith("icon_webr.svg"): + return IliasElementType.LINK + + if str(img_tag["src"]).endswith("frm.svg"): + return IliasElementType.FORUM + + if str(img_tag["src"]).endswith("sess.svg"): + return IliasElementType.MEETING + + return IliasElementType.FOLDER + + @staticmethod + def _normalize_meeting_name(meeting_name: str) -> str: + """ + Normalizes meeting names, which have a relative time as their first part, + to their date in ISO format. + """ + date_portion_str = meeting_name.split(" - ")[0] + date_portion = demangle_date(date_portion_str) + + if not date_portion: + return meeting_name + + rest_of_name = meeting_name + if rest_of_name.startswith(date_portion_str): + rest_of_name = rest_of_name[len(date_portion_str):] + + return datetime.strftime(date_portion, "%Y-%m-%d, %H:%M") + rest_of_name + + def _abs_url_from_link(self, link_tag: Tag) -> str: + """ + Create an absolute url from an tag. + """ + return urljoin(self._page_url, link_tag.get("href")) + +def demangle_date(date_str: str) -> Optional[datetime]: + return None + + +def _sanitize_path_name(name: str) -> str: + return name.replace("/", "-").replace("\\", "-").strip() + + class IliasCrawler(HttpCrawler): def __init__( self, @@ -64,7 +394,9 @@ class IliasCrawler(HttpCrawler): async def crawl(self) -> None: async with self.crawl_bar(PurePath("/")) as bar: soup = await self._get_page(self._base_url) - self.print("[green]Gotcha![/]") + page = IliasPage(soup, self._base_url, None) + for element in page.get_child_elements(): + self.print(element.name + " " + str(element.type)) async def _get_page(self, url: str, retries_left: int = 3) -> BeautifulSoup: if retries_left < 0: