import json import re from dataclasses import dataclass from datetime import date, datetime, timedelta from enum import Enum from typing import Dict, List, Optional, Union, cast from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup, Tag from PFERD.logging import log from PFERD.utils import url_set_query_params TargetType = Union[str, int] class IliasElementType(Enum): EXERCISE = "exercise" EXERCISE_FILES = "exercise_files" # own submitted files TEST = "test" # an online test. Will be ignored currently. FILE = "file" FOLDER = "folder" FORUM = "forum" LINK = "link" INFO_TAB = "info_tab" LEARNING_MODULE = "learning_module" BOOKING = "booking" MEETING = "meeting" SURVEY = "survey" MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder" MEDIACAST_VIDEO = "mediacast_video" OPENCAST_VIDEO = "opencast_video" OPENCAST_VIDEO_PLAYER = "opencast_video_player" OPENCAST_VIDEO_FOLDER = "opencast_video_folder" OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated" @dataclass class IliasPageElement: type: IliasElementType url: str name: str mtime: Optional[datetime] = None description: Optional[str] = None def id(self) -> str: regexes = [ r"eid=(?P[0-9a-z\-]+)", r"file_(?P\d+)", r"ref_id=(?P\d+)", r"target=[a-z]+_(?P\d+)", r"mm_(?P\d+)" ] for regex in regexes: if match := re.search(regex, self.url): return match.groupdict()["id"] # Fall back to URL log.warn(f"Didn't find identity for {self.name} - {self.url}. Please report this.") return self.url @dataclass class IliasDownloadForumData: url: str form_data: Dict[str, Union[str, List[str]]] empty: bool @dataclass class IliasForumThread: title: str title_tag: Tag content_tag: Tag mtime: Optional[datetime] @dataclass class IliasLearningModulePage: title: str content: Tag next_url: Optional[str] previous_url: Optional[str] class IliasPage: def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]): self._soup = soup self._page_url = _page_url self._page_type = source_element.type if source_element else None self._source_name = source_element.name if source_element else "" @staticmethod def is_root_page(soup: BeautifulSoup) -> bool: permalink = soup.find(id="current_perma_link") if permalink is None: return False value = permalink.attrs.get("value") if value is None: return False return "goto.php?target=root_" in value def get_child_elements(self) -> List[IliasPageElement]: """ Return all child page elements you can find here. """ if self._is_video_player(): log.explain("Page is a video player, extracting URL") return self._player_to_video() if self._is_opencast_video_listing(): log.explain("Page is an opencast video listing, searching for elements") return self._find_opencast_video_entries() if self._is_exercise_file(): log.explain("Page is an exercise, searching for elements") return self._find_exercise_entries() if self._is_personal_desktop(): log.explain("Page is the personal desktop, searching for elements") return self._find_personal_desktop_entries() if self._is_content_page(): log.explain("Page is a content page, searching for elements") return self._find_copa_entries() if self._is_info_tab(): log.explain("Page is info tab, searching for elements") return self._find_info_tab_entries() log.explain("Page is a normal folder, searching for elements") return self._find_normal_entries() def get_info_tab(self) -> Optional[IliasPageElement]: tab: Optional[Tag] = self._soup.find( name="a", attrs={"href": lambda x: x and "cmdClass=ilinfoscreengui" in x} ) if tab is not None: return IliasPageElement( IliasElementType.INFO_TAB, self._abs_url_from_link(tab), "infos" ) return None def get_description(self) -> Optional[BeautifulSoup]: def is_interesting_class(name: str) -> bool: return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"] paragraphs: List[Tag] = self._soup.findAll(class_=is_interesting_class) if not paragraphs: return None # Extract bits and pieces into a string and parse it again. # This ensures we don't miss anything and weird structures are resolved # somewhat gracefully. raw_html = "" for p in paragraphs: if p.find_parent(class_=is_interesting_class): continue # Ignore special listings (like folder groupings) if "ilc_section_Special" in p["class"]: continue raw_html += str(p) + "\n" raw_html = f"\n{raw_html}\n" return BeautifulSoup(raw_html, "html.parser") def get_learning_module_data(self) -> Optional[IliasLearningModulePage]: if not self._is_learning_module_page(): return None content = self._soup.select_one("#ilLMPageContent") title = self._soup.select_one(".ilc_page_title_PageTitle").getText().strip() return IliasLearningModulePage( title=title, content=content, next_url=self._find_learning_module_next(), previous_url=self._find_learning_module_prev() ) def _find_learning_module_next(self) -> Optional[str]: for link in self._soup.select("a.ilc_page_rnavlink_RightNavigationLink"): url = self._abs_url_from_link(link) if "baseClass=ilLMPresentationGUI" not in url: continue return url return None def _find_learning_module_prev(self) -> Optional[str]: for link in self._soup.select("a.ilc_page_lnavlink_LeftNavigationLink"): url = self._abs_url_from_link(link) if "baseClass=ilLMPresentationGUI" not in url: continue return url return None def get_download_forum_data(self) -> Optional[IliasDownloadForumData]: form = self._soup.find("form", attrs={"action": lambda x: x and "fallbackCmd=showThreads" in x}) if not form: return None post_url = self._abs_url_from_relative(form["action"]) thread_ids = [f["value"] for f in form.find_all(attrs={"name": "thread_ids[]"})] form_data: Dict[str, Union[str, List[str]]] = { "thread_ids[]": thread_ids, "selected_cmd2": "html", "select_cmd2": "Ausführen", "selected_cmd": "", } return IliasDownloadForumData(url=post_url, form_data=form_data, empty=len(thread_ids) == 0) def get_next_stage_element(self) -> Optional[IliasPageElement]: if self._is_forum_page(): if "trows=800" in self._page_url: return None log.explain("Requesting *all* forum threads") return self._get_show_max_forum_entries_per_page_url() if self._is_ilias_opencast_embedding(): log.explain("Unwrapping opencast embedding") return self.get_child_elements()[0] if self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED: log.explain("Unwrapping video pagination") return self._find_opencast_video_entries_paginated()[0] if self._contains_collapsed_future_meetings(): log.explain("Requesting *all* future meetings") return self._uncollapse_future_meetings_url() if not self._is_content_tab_selected(): if self._page_type != IliasElementType.INFO_TAB: log.explain("Selecting content tab") return self._select_content_page_url() else: log.explain("Crawling info tab, skipping content select") return None def _is_forum_page(self) -> bool: read_more_btn = self._soup.find( "button", attrs={"onclick": lambda x: x and "cmdClass=ilobjforumgui&cmd=markAllRead" in x} ) return read_more_btn is not None def _is_video_player(self) -> bool: return "paella_config_file" in str(self._soup) def _is_opencast_video_listing(self) -> bool: if self._is_ilias_opencast_embedding(): return True # Raw listing without ILIAS fluff video_element_table: Tag = self._soup.find( name="table", id=re.compile(r"tbl_xoct_.+") ) return video_element_table is not None def _is_ilias_opencast_embedding(self) -> bool: # ILIAS fluff around the real opencast html if self._soup.find(id="headerimage"): element: Tag = self._soup.find(id="headerimage") if "opencast" in element.attrs["src"].lower(): return True return False def _is_exercise_file(self) -> bool: # we know it from before if self._page_type == IliasElementType.EXERCISE: return True # We have no suitable parent - let's guesss if self._soup.find(id="headerimage"): element: Tag = self._soup.find(id="headerimage") if "exc" in element.attrs["src"].lower(): return True return False def _is_personal_desktop(self) -> bool: return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}) def _is_content_page(self) -> bool: link = self._soup.find(id="current_perma_link") if not link: return False return "target=copa_" in link.get("value") def _is_learning_module_page(self) -> bool: link = self._soup.find(id="current_perma_link") if not link: return False return "target=pg_" in link.get("value") def _contains_collapsed_future_meetings(self) -> bool: return self._uncollapse_future_meetings_url() is not None def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]: element = self._soup.find( "a", attrs={"href": lambda x: x and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)} ) if not element: return None link = self._abs_url_from_link(element) return IliasPageElement(IliasElementType.FOLDER, link, "show all meetings") def _is_content_tab_selected(self) -> bool: return self._select_content_page_url() is None def _is_info_tab(self) -> bool: might_be_info = self._soup.find("form", attrs={"name": lambda x: x == "formInfoScreen"}) is not None return self._page_type == IliasElementType.INFO_TAB and might_be_info def _select_content_page_url(self) -> Optional[IliasPageElement]: tab = self._soup.find( id="tab_view_content", attrs={"class": lambda x: x is not None and "active" not in x} ) # Already selected (or not found) if not tab: return None link = tab.find("a") if link: link = self._abs_url_from_link(link) return IliasPageElement(IliasElementType.FOLDER, link, "select content page") _unexpected_html_warning() log.warn_contd(f"Could not find content tab URL on {self._page_url!r}.") log.warn_contd("PFERD might not find content on the course's main page.") return None def _player_to_video(self) -> List[IliasPageElement]: # Fetch the actual video page. This is a small wrapper page initializing a javscript # player. Sadly we can not execute that JS. The actual video stream url is nowhere # on the page, but defined in a JS object inside a script tag, passed to the player # library. # We do the impossible and RegEx the stream JSON object out of the page's HTML source regex = re.compile( r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE ) json_match = regex.search(str(self._soup)) if json_match is None: log.warn("Could not find JSON stream info in video player. Ignoring video.") return [] json_str = json_match.group(1) # parse it json_object = json.loads(json_str) streams = [stream for stream in json_object["streams"]] # and just fetch the lone video url! if len(streams) == 1: video_url = streams[0]["sources"]["mp4"][0]["src"] return [IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, self._source_name)] log.explain(f"Found multiple videos for stream at {self._source_name}") items = [] for stream in sorted(streams, key=lambda stream: stream["content"]): full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4" video_url = stream["sources"]["mp4"][0]["src"] items.append(IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, full_name)) return items def _get_show_max_forum_entries_per_page_url(self) -> Optional[IliasPageElement]: correct_link = self._soup.find( "a", attrs={"href": lambda x: x and "trows=800" in x and "cmd=showThreads" in x} ) if not correct_link: return None link = self._abs_url_from_link(correct_link) return IliasPageElement(IliasElementType.FORUM, link, "show all forum threads") def _find_personal_desktop_entries(self) -> List[IliasPageElement]: items: List[IliasPageElement] = [] titles: List[Tag] = self._soup.select(".il-item-title") for title in titles: link = title.find("a") name = _sanitize_path_name(link.text.strip()) url = self._abs_url_from_link(link) type = self._find_type_from_link(name, link, url) if not type: _unexpected_html_warning() log.warn_contd(f"Could not extract type for {link}") continue log.explain(f"Found {name!r}") if type == IliasElementType.FILE and "_download" not in url: url = re.sub(r"(target=file_\d+)", r"\1_download", url) log.explain("Rewired file URL to include download part") items.append(IliasPageElement(type, url, name)) return items def _find_copa_entries(self) -> List[IliasPageElement]: items: List[IliasPageElement] = [] links: List[Tag] = self._soup.findAll(class_="ilc_flist_a_FileListItemLink") for link in links: url = self._abs_url_from_link(link) name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.getText()).strip().replace("\t", "") name = _sanitize_path_name(name) if "file_id" not in url: _unexpected_html_warning() log.warn_contd(f"Found unknown content page item {name!r} with url {url!r}") continue items.append(IliasPageElement(IliasElementType.FILE, url, name)) return items def _find_info_tab_entries(self) -> List[IliasPageElement]: items = [] links: List[Tag] = self._soup.select("a.il_ContainerItemCommand") for link in links: if "cmdClass=ilobjcoursegui" not in link["href"]: continue if "cmd=sendfile" not in link["href"]: continue items.append(IliasPageElement( IliasElementType.FILE, self._abs_url_from_link(link), _sanitize_path_name(link.getText()) )) return items def _find_opencast_video_entries(self) -> List[IliasPageElement]: # ILIAS has three stages for video pages # 1. The initial dummy page without any videos. This page contains the link to the listing # 2. The video listing which might be paginated # 3. An unpaginated video listing (or at least one that includes 800 videos) # # We need to figure out where we are. video_element_table: Tag = self._soup.find( name="table", id=re.compile(r"tbl_xoct_.+") ) if video_element_table is None: # We are in stage 1 # The page is actually emtpy but contains the link to stage 2 content_link: Tag = self._soup.select_one("#tab_series a") url: str = self._abs_url_from_link(content_link) query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} url = url_set_query_params(url, query_params) log.explain("Found ILIAS video frame page, fetching actual content next") return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, url, "")] is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER: # We are in stage 2 - try to break pagination return self._find_opencast_video_entries_paginated() return self._find_opencast_video_entries_no_paging() def _find_opencast_video_entries_paginated(self) -> List[IliasPageElement]: table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) if table_element is None: log.warn("Couldn't increase elements per page (table not found). I might miss elements.") return self._find_opencast_video_entries_no_paging() id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"]) if id_match is None: log.warn("Couldn't increase elements per page (table id not found). I might miss elements.") return self._find_opencast_video_entries_no_paging() table_id = id_match.group(1) query_params = {f"tbl_xoct_{table_id}_trows": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} url = url_set_query_params(self._page_url, query_params) log.explain("Disabled pagination, retrying folder as a new entry") return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")] def _find_opencast_video_entries_no_paging(self) -> List[IliasPageElement]: """ Crawls the "second stage" video page. This page contains the actual video urls. """ # Video start links are marked with an "Abspielen" link video_links: List[Tag] = self._soup.findAll( name="a", text=re.compile(r"\s*(Abspielen|Play)\s*") ) results: List[IliasPageElement] = [] for link in video_links: results.append(self._listed_opencast_video_to_element(link)) return results def _listed_opencast_video_to_element(self, link: Tag) -> IliasPageElement: # The link is part of a table with multiple columns, describing metadata. # 6th or 7th child (1 indexed) is the modification time string. Try to find it # by parsing backwards from the end and finding something that looks like a date modification_time = None row: Tag = link.parent.parent.parent column_count = len(row.select("td.std")) for index in range(column_count, 0, -1): modification_string = link.parent.parent.parent.select_one( f"td.std:nth-child({index})" ).getText().strip() if re.search(r"\d+\.\d+.\d+ - \d+:\d+", modification_string): modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M") break if modification_time is None: log.warn(f"Could not determine upload time for {link}") modification_time = datetime.now() title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip() title += ".mp4" video_name: str = _sanitize_path_name(title) video_url = self._abs_url_from_link(link) log.explain(f"Found video {video_name!r} at {video_url}") return IliasPageElement( IliasElementType.OPENCAST_VIDEO_PLAYER, video_url, video_name, modification_time ) def _find_exercise_entries(self) -> List[IliasPageElement]: if self._soup.find(id="tab_submission"): log.explain("Found submission tab. This is an exercise detail page") return self._find_exercise_entries_detail_page() log.explain("Found no submission tab. This is an exercise root page") return self._find_exercise_entries_root_page() def _find_exercise_entries_detail_page(self) -> List[IliasPageElement]: results: List[IliasPageElement] = [] # Find all download links in the container (this will contain all the files) download_links: List[Tag] = self._soup.findAll( name="a", # download links contain the given command class attrs={"href": lambda x: x and "cmd=download" in x}, text="Download" ) for link in download_links: parent_row: Tag = link.findParent("tr") children: List[Tag] = parent_row.findChildren("td") name = _sanitize_path_name(children[1].getText().strip()) log.explain(f"Found exercise detail entry {name!r}") for child in reversed(children): date = demangle_date(child.getText().strip(), fail_silently=True) if date is not None: break if date is None: log.warn(f"Date parsing failed for exercise entry {name!r}") results.append(IliasPageElement( IliasElementType.FILE, self._abs_url_from_link(link), name, date )) return results def _find_exercise_entries_root_page(self) -> List[IliasPageElement]: results: List[IliasPageElement] = [] # Each assignment is in an accordion container assignment_containers: List[Tag] = self._soup.select(".il_VAccordionInnerContainer") for container in assignment_containers: # Fetch the container name out of the header to use it in the path container_name = container.select_one(".ilAssignmentHeader").getText().strip() log.explain(f"Found exercise container {container_name!r}") # Find all download links in the container (this will contain all the files) files: List[Tag] = container.findAll( name="a", # download links contain the given command class attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x}, text="Download" ) # Grab each file as you now have the link for file_link in files: # Two divs, side by side. Left is the name, right is the link ==> get left # sibling file_name = file_link.parent.findPrevious(name="div").getText().strip() file_name = _sanitize_path_name(file_name) url = self._abs_url_from_link(file_link) log.explain(f"Found exercise entry {file_name!r}") results.append(IliasPageElement( IliasElementType.FILE, url, container_name + "/" + file_name, None # We do not have any timestamp )) # Find all links to file listings (e.g. "Submitted Files" for groups) file_listings: List[Tag] = container.findAll( name="a", # download links contain the given command class attrs={"href": lambda x: x and "cmdClass=ilexsubmissionfilegui" in x} ) # Add each listing as a new for listing in file_listings: parent_container: Tag = listing.findParent( "div", attrs={"class": lambda x: x and "form-group" in x} ) label_container: Tag = parent_container.find( attrs={"class": lambda x: x and "control-label" in x} ) file_name = _sanitize_path_name(label_container.getText().strip()) url = self._abs_url_from_link(listing) log.explain(f"Found exercise detail {file_name!r} at {url}") results.append(IliasPageElement( IliasElementType.EXERCISE_FILES, url, container_name + "/" + file_name, None # we do not have any timestamp )) return results def _find_normal_entries(self) -> List[IliasPageElement]: result: List[IliasPageElement] = [] # Fetch all links and throw them to the general interpreter links: List[Tag] = self._soup.select("a.il_ContainerItemTitle") for link in links: abs_url = self._abs_url_from_link(link) parents = self._find_upwards_folder_hierarchy(link) if parents: element_name = "/".join(parents) + "/" + _sanitize_path_name(link.getText()) else: element_name = _sanitize_path_name(link.getText()) element_type = self._find_type_from_link(element_name, link, abs_url) description = self._find_link_description(link) # The last meeting on every page is expanded by default. # Its content is then shown inline *and* in the meeting page itself. # We should skip the inline content. if element_type != IliasElementType.MEETING and self._is_in_expanded_meeting(link): continue if not element_type: continue if element_type == IliasElementType.MEETING: normalized = _sanitize_path_name(self._normalize_meeting_name(element_name)) log.explain(f"Normalized meeting name from {element_name!r} to {normalized!r}") element_name = normalized elif element_type == IliasElementType.FILE: result.append(self._file_to_element(element_name, abs_url, link)) continue log.explain(f"Found {element_name!r}") result.append(IliasPageElement(element_type, abs_url, element_name, description=description)) result += self._find_cards() result += self._find_mediacast_videos() return result def _find_mediacast_videos(self) -> List[IliasPageElement]: videos: List[IliasPageElement] = [] for elem in cast(List[Tag], self._soup.select(".ilPlayerPreviewOverlayOuter")): element_name = _sanitize_path_name( elem.select_one(".ilPlayerPreviewDescription").getText().strip() ) if not element_name.endswith(".mp4"): # just to make sure it has some kinda-alrightish ending element_name = element_name + ".mp4" video_element = elem.find(name="video") if not video_element: _unexpected_html_warning() log.warn_contd(f"No