import json import re from dataclasses import dataclass from datetime import date, datetime, timedelta from enum import Enum # TODO In Python 3.9 and above, AsyncContextManager is deprecated from typing import List, Optional, Union from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup, Tag from PFERD.utils import url_set_query_params TargetType = Union[str, int] class IliasElementType(Enum): EXERCISE = "exercise" FILE = "file" FOLDER = "folder" FORUM = "forum" LINK = "link" MEETING = "meeting" VIDEO = "video" VIDEO_PLAYER = "video_player" VIDEO_FOLDER = "video_folder" VIDEO_FOLDER_MAYBE_PAGINATED = "video_folder_maybe_paginated" @dataclass class IliasPageElement: type: IliasElementType url: str name: str mtime: Optional[datetime] = None description: Optional[str] = None class IliasPage: def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]): self._soup = soup self._page_url = _page_url self._page_type = source_element.type if source_element else None self._source_name = source_element.name if source_element else "" def get_child_elements(self) -> List[IliasPageElement]: """ Return all child page elements you can find here. """ if self._is_video_player(): return self._player_to_video() if self._is_video_listing(): return self._find_video_entries() if self._is_exercise_file(): return self._find_exercise_entries() return self._find_normal_entries() def _is_video_player(self) -> bool: return "paella_config_file" in str(self._soup) def _is_video_listing(self) -> bool: # ILIAS fluff around it if self._soup.find(id="headerimage"): element: Tag = self._soup.find(id="headerimage") if "opencast" in element.attrs["src"].lower(): return True # Raw listing without ILIAS fluff video_element_table: Tag = self._soup.find( name="table", id=re.compile(r"tbl_xoct_.+") ) return video_element_table is not None def _is_exercise_file(self) -> bool: # we know it from before if self._page_type == IliasElementType.EXERCISE: return True # We have no suitable parent - let's guesss if self._soup.find(id="headerimage"): element: Tag = self._soup.find(id="headerimage") if "exc" in element.attrs["src"].lower(): return True return False def _player_to_video(self) -> List[IliasPageElement]: # Fetch the actual video page. This is a small wrapper page initializing a javscript # player. Sadly we can not execute that JS. The actual video stream url is nowhere # on the page, but defined in a JS object inside a script tag, passed to the player # library. # We do the impossible and RegEx the stream JSON object out of the page's HTML source regex: re.Pattern[str] = re.compile( r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE ) json_match = regex.search(str(self._soup)) if json_match is None: print(f"Could not find json stream info for {self._page_url!r}") return [] json_str = json_match.group(1) # parse it json_object = json.loads(json_str) # and fetch the video url! video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"] return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)] def _find_video_entries(self) -> List[IliasPageElement]: # ILIAS has three stages for video pages # 1. The initial dummy page without any videos. This page contains the link to the listing # 2. The video listing which might be paginated # 3. An unpaginated video listing (or at least one that includes 800 videos) # # We need to figure out where we are. video_element_table: Tag = self._soup.find( name="table", id=re.compile(r"tbl_xoct_.+") ) if video_element_table is None: # We are in stage 1 # The page is actually emtpy but contains the link to stage 2 content_link: Tag = self._soup.select_one("#tab_series a") url: str = self._abs_url_from_link(content_link) query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} url = url_set_query_params(url, query_params) return [IliasPageElement(IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED, url, "")] is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None if is_paginated and not self._page_type == IliasElementType.VIDEO_FOLDER: # We are in stage 2 - try to break pagination return self._find_video_entries_paginated() return self._find_video_entries_no_paging() def _find_video_entries_paginated(self) -> List[IliasPageElement]: table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) if table_element is None: # TODO: Properly log this print( "Could not increase elements per page (table not found)." " Some might not be crawled!" ) return self._find_video_entries_no_paging() id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"]) if id_match is None: # TODO: Properly log this print( "Could not increase elements per page (table id not found)." " Some might not be crawled!" ) return self._find_video_entries_no_paging() table_id = id_match.group(1) query_params = {f"tbl_xoct_{table_id}_trows": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} url = url_set_query_params(self._page_url, query_params) return [IliasPageElement(IliasElementType.VIDEO_FOLDER, url, "")] def _find_video_entries_no_paging(self) -> List[IliasPageElement]: """ Crawls the "second stage" video page. This page contains the actual video urls. """ # Video start links are marked with an "Abspielen" link video_links: List[Tag] = self._soup.findAll( name="a", text=re.compile(r"\s*Abspielen\s*") ) results: List[IliasPageElement] = [] # TODO: Sadly the download button is currently broken, so never do that for link in video_links: results.append(self._listed_video_to_element(link)) return results def _listed_video_to_element(self, link: Tag) -> IliasPageElement: # The link is part of a table with multiple columns, describing metadata. # 6th child (1 indexed) is the modification time string modification_string = link.parent.parent.parent.select_one( "td.std:nth-child(6)" ).getText().strip() modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M") title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip() title += ".mp4" video_name: str = _sanitize_path_name(title) video_url = self._abs_url_from_link(link) return IliasPageElement(IliasElementType.VIDEO_PLAYER, video_url, video_name, modification_time) def _find_exercise_entries(self) -> List[IliasPageElement]: results: List[IliasPageElement] = [] # Each assignment is in an accordion container assignment_containers: List[Tag] = self._soup.select(".il_VAccordionInnerContainer") for container in assignment_containers: # Fetch the container name out of the header to use it in the path container_name = container.select_one(".ilAssignmentHeader").getText().strip() # Find all download links in the container (this will contain all the files) files: List[Tag] = container.findAll( name="a", # download links contain the given command class attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x}, text="Download" ) # Grab each file as you now have the link for file_link in files: # Two divs, side by side. Left is the name, right is the link ==> get left # sibling file_name = file_link.parent.findPrevious(name="div").getText().strip() file_name = _sanitize_path_name(file_name) url = self._abs_url_from_link(file_link) results.append(IliasPageElement( IliasElementType.FILE, url, container_name + "/" + file_name, None # We do not have any timestamp )) return results def _find_normal_entries(self) -> List[IliasPageElement]: result: List[IliasPageElement] = [] # Fetch all links and throw them to the general interpreter links: List[Tag] = self._soup.select("a.il_ContainerItemTitle") for link in links: abs_url = self._abs_url_from_link(link) element_name = _sanitize_path_name(link.getText()) element_type = self._find_type_from_link(element_name, link, abs_url) description = self._find_link_description(link) if not element_type: continue if element_type == IliasElementType.MEETING: element_name = _sanitize_path_name(self._normalize_meeting_name(element_name)) elif element_type == IliasElementType.FILE: result.append(self._file_to_element(element_name, abs_url, link)) continue result.append(IliasPageElement(element_type, abs_url, element_name, description=description)) return result def _find_link_description(self, link: Tag) -> Optional[str]: tile: Tag = link.findParent("div", {"class": lambda x: x and "il_ContainerListItem" in x}) if not tile: return None description_element: Tag = tile.find("div", {"class": lambda x: x and "il_Description" in x}) if not description_element: return None return description_element.getText().strip() def _file_to_element(self, name: str, url: str, link_element: Tag) -> IliasPageElement: # Files have a list of properties (type, modification date, size, etc.) # In a series of divs. # Find the parent containing all those divs, so we can filter our what we need properties_parent: Tag = link_element.findParent( "div", {"class": lambda x: "il_ContainerListItem" in x} ).select_one(".il_ItemProperties") # The first one is always the filetype file_type = properties_parent.select_one("span.il_ItemProperty").getText().strip() # The rest does not have a stable order. Grab the whole text and reg-ex the date # out of it all_properties_text = properties_parent.getText().strip() modification_date_match = re.search( r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)", all_properties_text ) if modification_date_match is None: modification_date = None # TODO: Properly log this print(f"Could not extract start date from {all_properties_text!r}") else: modification_date_str = modification_date_match.group(1) modification_date = demangle_date(modification_date_str) # Grab the name from the link text name = _sanitize_path_name(link_element.getText()) full_path = name + "." + file_type return IliasPageElement(IliasElementType.FILE, url, full_path, modification_date) @staticmethod def _find_type_from_link( element_name: str, link_element: Tag, url: str ) -> Optional[IliasElementType]: """ Decides which sub crawler to use for a given top level element. """ parsed_url = urlparse(url) # file URLs contain "target=file" if "target=file_" in parsed_url.query: return IliasElementType.FILE # Skip forums if "cmd=showThreads" in parsed_url.query: return IliasElementType.FORUM # Everything with a ref_id can *probably* be opened to reveal nested things # video groups, directories, exercises, etc if "ref_id=" in parsed_url.query: return IliasPage._find_type_from_folder_like(link_element, url) # TODO: Log this properly print(f"Unknown type: The element was at {str(element_name)!r} and it is {link_element!r})") return None @staticmethod def _find_type_from_folder_like(link_element: Tag, url: str) -> Optional[IliasElementType]: """ Try crawling something that looks like a folder. """ # pylint: disable=too-many-return-statements found_parent: Optional[Tag] = None # We look for the outer div of our inner link, to find information around it # (mostly the icon) for parent in link_element.parents: if "ilContainerListItemOuter" in parent["class"]: found_parent = parent break if found_parent is None: # TODO: Log this properly print(f"Could not find element icon for {url!r}") return None # Find the small descriptive icon to figure out the type img_tag: Optional[Tag] = found_parent.select_one("img.ilListItemIcon") if img_tag is None: # TODO: Log this properly print(f"Could not find image tag for {url!r}") return None if "opencast" in str(img_tag["alt"]).lower(): return IliasElementType.VIDEO_FOLDER if str(img_tag["src"]).endswith("icon_exc.svg"): return IliasElementType.EXERCISE if str(img_tag["src"]).endswith("icon_webr.svg"): return IliasElementType.LINK if str(img_tag["src"]).endswith("frm.svg"): return IliasElementType.FORUM if str(img_tag["src"]).endswith("sess.svg"): return IliasElementType.MEETING return IliasElementType.FOLDER @staticmethod def _normalize_meeting_name(meeting_name: str) -> str: """ Normalizes meeting names, which have a relative time as their first part, to their date in ISO format. """ date_portion_str = meeting_name.split(" - ")[0] date_portion = demangle_date(date_portion_str) if not date_portion: return meeting_name rest_of_name = meeting_name if rest_of_name.startswith(date_portion_str): rest_of_name = rest_of_name[len(date_portion_str):] return datetime.strftime(date_portion, "%Y-%m-%d, %H:%M") + rest_of_name def _abs_url_from_link(self, link_tag: Tag) -> str: """ Create an absolute url from an tag. """ return urljoin(self._page_url, link_tag.get("href")) german_months = ['Jan', 'Feb', 'Mär', 'Apr', 'Mai', 'Jun', 'Jul', 'Aug', 'Sep', 'Okt', 'Nov', 'Dez'] english_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] def demangle_date(date_str: str) -> Optional[datetime]: """ Demangle a given date in one of the following formats: "Gestern, HH:MM" "Heute, HH:MM" "Morgen, HH:MM" "dd. mon yyyy, HH:MM """ try: date_str = re.sub(r"\s+", " ", date_str) date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I) date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I) date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I) for german, english in zip(german_months, english_months): date_str = date_str.replace(german, english) # Remove trailing dots for abbreviations, e.g. "20. Apr. 2020" -> "20. Apr 2020" date_str = date_str.replace(english + ".", english) # We now have a nice english String in the format: "dd. mmm yyyy, hh:mm" day_part, time_part = date_str.split(",") day_str, month_str, year_str = day_part.split(" ") day = int(day_str.strip().replace(".", "")) month = english_months.index(month_str.strip()) + 1 year = int(year_str.strip()) hour_str, minute_str = time_part.split(":") hour = int(hour_str) minute = int(minute_str) return datetime(year, month, day, hour, minute) except Exception: # TODO: Properly log this print(f"Could not parse date {date_str!r}") return None def _format_date_english(date_to_format: date) -> str: month = english_months[date_to_format.month - 1] return f"{date_to_format.day:02d}. {month} {date_to_format.year:04d}" def _yesterday() -> date: return date.today() - timedelta(days=1) def _tomorrow() -> date: return date.today() + timedelta(days=1) def _sanitize_path_name(name: str) -> str: return name.replace("/", "-").replace("\\", "-").strip()