diff --git a/CHANGELOG.md b/CHANGELOG.md index ae82e4f..0a26913 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,8 +22,14 @@ ambiguous situations. ## Unreleased +### Added +- Support for ILIAS 9 + ### Changed - Added prettier CSS to forum threads +- Downloaded forum threads now link to the forum instead of the ILIAS thread +- Increase minimum supported Python version to 3.11 +- Do not crawl nested courses (courses linked in other courses) ## Fixed - File links in report on Windows diff --git a/PFERD/crawl/ilias/ilias_web_crawler.py b/PFERD/crawl/ilias/ilias_web_crawler.py index add49ee..52ecf92 100644 --- a/PFERD/crawl/ilias/ilias_web_crawler.py +++ b/PFERD/crawl/ilias/ilias_web_crawler.py @@ -22,7 +22,7 @@ from .async_helper import _iorepeat from .file_templates import Links, forum_thread_template, learning_module_template from .ilias_html_cleaner import clean, insert_base_markup from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage, - IliasPageElement, _sanitize_path_name, parse_ilias_forum_export) + IliasPageElement, IliasSoup, _sanitize_path_name, parse_ilias_forum_export) from .shibboleth_login import ShibbolethLogin TargetType = Union[str, int] @@ -105,7 +105,6 @@ class IliasWebCrawlerSection(HttpCrawlerSection): _DIRECTORY_PAGES: Set[IliasElementType] = { - IliasElementType.COURSE, IliasElementType.EXERCISE, IliasElementType.EXERCISE_FILES, IliasElementType.FOLDER, @@ -217,11 +216,19 @@ instance's greatest bottleneck. async def _crawl_desktop(self) -> None: await self._crawl_url( - urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show") + urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show"), + crawl_nested_courses=True ) - async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None: - if awaitable := await self._handle_ilias_page(url, None, PurePath("."), expected_id): + async def _crawl_url( + self, + url: str, + expected_id: Optional[int] = None, + crawl_nested_courses: bool = False + ) -> None: + if awaitable := await self._handle_ilias_page( + url, None, PurePath("."), expected_id, crawl_nested_courses + ): await awaitable async def _handle_ilias_page( @@ -230,6 +237,7 @@ instance's greatest bottleneck. current_element: Optional[IliasPageElement], path: PurePath, expected_course_id: Optional[int] = None, + crawl_nested_courses: bool = False ) -> Optional[Coroutine[Any, Any, None]]: maybe_cl = await self.crawl(path) if not maybe_cl: @@ -237,7 +245,9 @@ instance's greatest bottleneck. if current_element: self._ensure_not_seen(current_element, path) - return self._crawl_ilias_page(url, current_element, maybe_cl, expected_course_id) + return self._crawl_ilias_page( + url, current_element, maybe_cl, expected_course_id, crawl_nested_courses + ) @anoncritical async def _crawl_ilias_page( @@ -246,6 +256,7 @@ instance's greatest bottleneck. current_element: Optional[IliasPageElement], cl: CrawlToken, expected_course_id: Optional[int] = None, + crawl_nested_courses: bool = False, ) -> None: elements: List[IliasPageElement] = [] # A list as variable redefinitions are not propagated to outer scopes @@ -267,12 +278,12 @@ instance's greatest bottleneck. # If we expect to find a root course, enforce it if current_parent is None and expected_course_id is not None: perma_link = IliasPage.get_soup_permalink(soup) - if not perma_link or "crs_" not in perma_link: + if not perma_link or "crs/" not in perma_link: raise CrawlError("Invalid course id? Didn't find anything looking like a course") if str(expected_course_id) not in perma_link: raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}") - page = IliasPage(soup, next_stage_url, current_parent) + page = IliasPage(soup, current_parent) if next_element := page.get_next_stage_element(): current_parent = next_element next_stage_url = next_element.url @@ -294,7 +305,7 @@ instance's greatest bottleneck. tasks: List[Awaitable[None]] = [] for element in elements: - if handle := await self._handle_ilias_element(cl.path, element): + if handle := await self._handle_ilias_element(cl.path, element, crawl_nested_courses): tasks.append(asyncio.create_task(handle)) # And execute them @@ -310,6 +321,7 @@ instance's greatest bottleneck. self, parent_path: PurePath, element: IliasPageElement, + crawl_nested_courses: bool = False ) -> Optional[Coroutine[Any, Any, None]]: # element.name might contain `/` if the crawler created nested elements, # so we can not sanitize it here. We trust in the output dir to thwart worst-case @@ -362,6 +374,56 @@ instance's greatest bottleneck. "[bright_black](scorm learning modules are not supported)" ) return None + elif element.type == IliasElementType.LITERATURE_LIST: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](literature lists are not currently supported)" + ) + return None + elif element.type == IliasElementType.LEARNING_MODULE_HTML: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](HTML learning modules are not supported)" + ) + return None + elif element.type == IliasElementType.BLOG: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](blogs are not currently supported)" + ) + return None + elif element.type == IliasElementType.DCL_RECORD_LIST: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](dcl record lists are not currently supported)" + ) + return None + elif element.type == IliasElementType.MEDIA_POOL: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](media pools are not currently supported)" + ) + return None + elif element.type == IliasElementType.COURSE: + if crawl_nested_courses: + return await self._handle_ilias_page(element.url, element, element_path) + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](not descending into linked course)" + ) + return None elif element.type == IliasElementType.LEARNING_MODULE: return await self._handle_learning_module(element, element_path) elif element.type == IliasElementType.LINK: @@ -590,7 +652,7 @@ instance's greatest bottleneck. ) async with dl as (bar, sink): - page = IliasPage(await self._get_page(element.url), element.url, element) + page = IliasPage(await self._get_page(element.url), element) stream_elements = page.get_child_elements() if len(stream_elements) > 1: @@ -600,7 +662,7 @@ instance's greatest bottleneck. stream_element = stream_elements[0] # We do not have a local cache yet - await self._stream_from_url(stream_element.url, sink, bar, is_video=True) + await self._stream_from_url(stream_element, sink, bar, is_video=True) add_to_report([str(self._transformer.transform(dl.path))]) return @@ -615,7 +677,7 @@ instance's greatest bottleneck. async with maybe_dl as (bar, sink): log.explain(f"Streaming video from real url {stream_element.url}") contained_video_paths.append(str(self._transformer.transform(maybe_dl.path))) - await self._stream_from_url(stream_element.url, sink, bar, is_video=True) + await self._stream_from_url(stream_element, sink, bar, is_video=True) add_to_report(contained_video_paths) @@ -637,12 +699,19 @@ instance's greatest bottleneck. async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None: assert dl # The function is only reached when dl is not None async with dl as (bar, sink): - await self._stream_from_url(element.url, sink, bar, is_video) + await self._stream_from_url(element, sink, bar, is_video) + + async def _stream_from_url( + self, + element: IliasPageElement, + sink: FileSink, + bar: ProgressBar, + is_video: bool + ) -> None: + url = element.url - async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None: async def try_stream() -> bool: next_url = url - # Normal files redirect to the magazine if we are not authenticated. As files could be HTML, # we can not match on the content type here. Instead, we disallow redirects and inspect the # new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume @@ -690,7 +759,7 @@ instance's greatest bottleneck. await self.authenticate(auth_id) if not await try_stream(): - raise CrawlError("File streaming failed after authenticate()") + raise CrawlError(f"File streaming failed after authenticate() {element!r}") async def _handle_forum( self, @@ -705,70 +774,23 @@ instance's greatest bottleneck. @_iorepeat(3, "crawling forum") @anoncritical async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None: - elements: List[IliasForumThread] = [] - async with cl: - next_stage_url = element.url - page = None - - while next_stage_url: - log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}") - log.explain(f"URL: {next_stage_url}") - - soup = await self._get_page(next_stage_url) - page = IliasPage(soup, next_stage_url, element) - - if next := page.get_next_stage_element(): - next_stage_url = next.url - else: - break - - forum_threads: list[tuple[IliasPageElement, bool]] = [] - for entry in cast(IliasPage, page).get_forum_entries(): - path = cl.path / (_sanitize_path_name(entry.name) + ".html") - forum_threads.append((entry, self.should_try_download(path, mtime=entry.mtime))) - - # Sort the ids. The forum download will *preserve* this ordering - forum_threads.sort(key=lambda elem: elem[0].id()) - - if not forum_threads: - log.explain("Forum had no threads") + inner = IliasPage(await self._get_page(element.url), element) + export_url = inner.get_forum_export_url() + if not export_url: + log.warn("Could not extract forum export url") return - download_data = cast(IliasPage, page).get_download_forum_data( - [thread.id() for thread, download in forum_threads if download] - ) - if not download_data: - raise CrawlWarning("Failed to extract forum data") + export = await self._post(export_url, { + "format": "html", + "cmd[createExportFile]": "" + }) - if not download_data.empty: - html = await self._post_authenticated(download_data.url, download_data.form_data) - elements = parse_ilias_forum_export(soupify(html)) - else: - elements = [] - - # Verify that ILIAS does not change the order, as we depend on it later. Otherwise, we could not call - # download in the correct order, potentially messing up duplication handling. - expected_element_titles = [thread.name for thread, download in forum_threads if download] - actual_element_titles = [_sanitize_path_name(thread.name) for thread in elements] - if expected_element_titles != actual_element_titles: - raise CrawlWarning( - f"Forum thread order mismatch: {expected_element_titles} != {actual_element_titles}" - ) + elements = parse_ilias_forum_export(soupify(export)) tasks: List[Awaitable[None]] = [] - for thread, download in forum_threads: - if download: - # This only works because ILIAS keeps the order in the export - elem = elements.pop(0) - tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, elem, thread))) - else: - # We only downloaded the threads we "should_try_download"ed. This can be an - # over-approximation and all will be fine. - # If we selected too few, e.g. because there was a duplicate title and the mtime of the - # original is newer than the update of the duplicate. - # This causes stale data locally, but I consider this problem acceptable right now. - tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, thread))) + for thread in elements: + tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, element.url))) # And execute them await self.gather(tasks) @@ -779,7 +801,7 @@ instance's greatest bottleneck. self, parent_path: PurePath, thread: Union[IliasForumThread, IliasPageElement], - element: IliasPageElement + forum_url: str ) -> None: path = parent_path / (_sanitize_path_name(thread.name) + ".html") maybe_dl = await self.download(path, mtime=thread.mtime) @@ -789,7 +811,7 @@ instance's greatest bottleneck. async with maybe_dl as (bar, sink): rendered = forum_thread_template( thread.name, - element.url, + forum_url, thread.name_tag, await self.internalize_images(thread.content_tag) ) @@ -817,7 +839,7 @@ instance's greatest bottleneck. log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}") log.explain(f"URL: {element.url}") soup = await self._get_page(element.url) - page = IliasPage(soup, element.url, element) + page = IliasPage(soup, element) if next := page.get_learning_module_data(): elements.extend(await self._crawl_learning_module_direction( cl.path, next.previous_url, "left", element @@ -860,7 +882,7 @@ instance's greatest bottleneck. log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})") log.explain(f"URL: {next_element_url}") soup = await self._get_page(next_element_url) - page = IliasPage(soup, next_element_url, parent_element) + page = IliasPage(soup, parent_element) if next := page.get_learning_module_data(): elements.append(next) if dir == "left": @@ -891,13 +913,13 @@ instance's greatest bottleneck. if prev: prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html")) if prev_p: - prev = os.path.relpath(prev_p, my_path.parent) + prev = cast(str, os.path.relpath(prev_p, my_path.parent)) else: prev = None if next: next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html")) if next_p: - next = os.path.relpath(next_p, my_path.parent) + next = cast(str, os.path.relpath(next_p, my_path.parent)) else: next = None @@ -937,10 +959,10 @@ instance's greatest bottleneck. ) self._visited_urls[element.url] = parent_path - async def _get_page(self, url: str, root_page_allowed: bool = False) -> BeautifulSoup: + async def _get_page(self, url: str, root_page_allowed: bool = False) -> IliasSoup: auth_id = await self._current_auth_id() async with self.session.get(url) as request: - soup = soupify(await request.read()) + soup = IliasSoup(soupify(await request.read()), str(request.url)) if IliasPage.is_logged_in(soup): return self._verify_page(soup, url, root_page_allowed) @@ -949,13 +971,13 @@ instance's greatest bottleneck. # Retry once after authenticating. If this fails, we will die. async with self.session.get(url) as request: - soup = soupify(await request.read()) + soup = IliasSoup(soupify(await request.read()), str(request.url)) if IliasPage.is_logged_in(soup): return self._verify_page(soup, url, root_page_allowed) raise CrawlError(f"get_page failed even after authenticating on {url!r}") @staticmethod - def _verify_page(soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup: + def _verify_page(soup: IliasSoup, url: str, root_page_allowed: bool) -> IliasSoup: if IliasPage.is_root_page(soup) and not root_page_allowed: raise CrawlError( "Unexpectedly encountered ILIAS root page. " @@ -967,29 +989,19 @@ instance's greatest bottleneck. ) return soup - async def _post_authenticated( + async def _post( self, url: str, data: dict[str, Union[str, List[str]]] ) -> bytes: - auth_id = await self._current_auth_id() - form_data = aiohttp.FormData() for key, val in data.items(): form_data.add_field(key, val) - async with self.session.post(url, data=form_data(), allow_redirects=False) as request: + async with self.session.post(url, data=form_data()) as request: if request.status == 200: return await request.read() - - # We weren't authenticated, so try to do that - await self.authenticate(auth_id) - - # Retry once after authenticating. If this fails, we will die. - async with self.session.post(url, data=data, allow_redirects=False) as request: - if request.status == 200: - return await request.read() - raise CrawlError("post_authenticated failed even after authenticating") + raise CrawlError(f"post failed with status {request.status}") async def _get_authenticated(self, url: str) -> bytes: auth_id = await self._current_auth_id() @@ -1037,34 +1049,6 @@ instance's greatest bottleneck. # do the actual login async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request: - soup = soupify(await request.read()) - if not self._is_logged_in(soup): + soup = IliasSoup(soupify(await request.read()), str(request.url)) + if not IliasPage.is_logged_in(soup): self._auth.invalidate_credentials() - - @staticmethod - def _is_logged_in(soup: BeautifulSoup) -> bool: - # Normal ILIAS pages - mainbar = cast(Optional[Tag], soup.find(class_="il-maincontrols-metabar")) - if mainbar is not None: - login_button = mainbar.find(attrs={"href": lambda x: x is not None and "login.php" in x}) - shib_login = soup.find(id="button_shib_login") - return not login_button and not shib_login - - # Personal Desktop - if soup.find("a", attrs={"href": lambda x: x is not None and "block_type=pditems" in x}): - return True - - # Video listing embeds do not have complete ILIAS html. Try to match them by - # their video listing table - video_table = soup.find( - recursive=True, - name="table", - attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")} - ) - if video_table is not None: - return True - # The individual video player wrapper page has nothing of the above. - # Match it by its playerContainer. - if soup.select_one("#playerContainer") is not None: - return True - return False diff --git a/PFERD/crawl/ilias/kit_ilias_html.py b/PFERD/crawl/ilias/kit_ilias_html.py index 963ab05..5ea17d6 100644 --- a/PFERD/crawl/ilias/kit_ilias_html.py +++ b/PFERD/crawl/ilias/kit_ilias_html.py @@ -3,20 +3,100 @@ import re from dataclasses import dataclass from datetime import date, datetime, timedelta from enum import Enum -from typing import Dict, Optional, Union, cast +from typing import Callable, Dict, Optional, Union, cast from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup, Tag +from PFERD.crawl import CrawlError +from PFERD.crawl.crawler import CrawlWarning from PFERD.logging import log from PFERD.utils import url_set_query_params TargetType = Union[str, int] +class TypeMatcher: + class UrlPath: + path: str + + def __init__(self, path: str): + self.path = path + + class UrlParameter: + query: str + + def __init__(self, query: str): + self.query = query + + class ImgSrc: + src: str + + def __init__(self, src: str): + self.src = src + + class ImgAlt: + alt: str + + def __init__(self, alt: str): + self.alt = alt + + class All: + matchers: list['IliasElementMatcher'] + + def __init__(self, matchers: list['IliasElementMatcher']): + self.matchers = matchers + + class Any: + matchers: list['IliasElementMatcher'] + + def __init__(self, matchers: list['IliasElementMatcher']): + self.matchers = matchers + + @staticmethod + def path(path: str) -> UrlPath: + return TypeMatcher.UrlPath(path) + + @staticmethod + def query(query: str) -> UrlParameter: + return TypeMatcher.UrlParameter(query) + + @staticmethod + def img_src(src: str) -> ImgSrc: + return TypeMatcher.ImgSrc(src) + + @staticmethod + def img_alt(alt: str) -> ImgAlt: + return TypeMatcher.ImgAlt(alt) + + @staticmethod + def all(*matchers: 'IliasElementMatcher') -> All: + return TypeMatcher.All(list(matchers)) + + @staticmethod + def any(*matchers: 'IliasElementMatcher') -> Any: + return TypeMatcher.Any(list(matchers)) + + @staticmethod + def never() -> Any: + return TypeMatcher.Any([]) + + +IliasElementMatcher = ( + TypeMatcher.UrlPath + | TypeMatcher.UrlParameter + | TypeMatcher.ImgSrc + | TypeMatcher.ImgAlt + | TypeMatcher.All + | TypeMatcher.Any +) + + class IliasElementType(Enum): + BLOG = "blog" BOOKING = "booking" COURSE = "course" + DCL_RECORD_LIST = "dcl_record_list" EXERCISE = "exercise" EXERCISE_FILES = "exercise_files" # own submitted files FILE = "file" @@ -25,7 +105,10 @@ class IliasElementType(Enum): FORUM_THREAD = "forum_thread" INFO_TAB = "info_tab" LEARNING_MODULE = "learning_module" + LEARNING_MODULE_HTML = "learning_module_html" + LITERATURE_LIST = "literature_list" LINK = "link" + MEDIA_POOL = "media_pool" MEDIACAST_VIDEO = "mediacast_video" MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder" MEETING = "meeting" @@ -38,6 +121,131 @@ class IliasElementType(Enum): SURVEY = "survey" TEST = "test" # an online test. Will be ignored currently. + def matcher(self) -> IliasElementMatcher: + match self: + case IliasElementType.BLOG: + return TypeMatcher.any( + TypeMatcher.img_src("_blog.svg") + ) + case IliasElementType.BOOKING: + return TypeMatcher.any( + TypeMatcher.path("/book/"), + TypeMatcher.img_src("_book.svg") + ) + case IliasElementType.COURSE: + return TypeMatcher.any(TypeMatcher.path("/crs/"), TypeMatcher.img_src("_crsr.svg")) + case IliasElementType.DCL_RECORD_LIST: + return TypeMatcher.any( + TypeMatcher.img_src("_dcl.svg"), + TypeMatcher.query("cmdclass=ildclrecordlistgui") + ) + case IliasElementType.EXERCISE: + return TypeMatcher.any( + TypeMatcher.path("/exc/"), + TypeMatcher.path("_exc_"), + TypeMatcher.img_src("_exc.svg"), + ) + case IliasElementType.EXERCISE_FILES: + return TypeMatcher.never() + case IliasElementType.FILE: + return TypeMatcher.any( + TypeMatcher.query("cmd=sendfile"), + TypeMatcher.path("_file_"), + TypeMatcher.img_src("/filedelivery/"), + ) + case IliasElementType.FOLDER: + return TypeMatcher.any( + TypeMatcher.path("/fold/"), + TypeMatcher.img_src("_fold.svg"), + + TypeMatcher.path("/grp/"), + TypeMatcher.img_src("_grp.svg"), + + TypeMatcher.path("/copa/"), + TypeMatcher.path("_copa_"), + TypeMatcher.img_src("_copa.svg"), + + # Not supported right now but warn users + # TypeMatcher.query("baseclass=ilmediapoolpresentationgui"), + # TypeMatcher.img_alt("medienpool"), + # TypeMatcher.img_src("_mep.svg"), + ) + case IliasElementType.FORUM: + return TypeMatcher.any( + TypeMatcher.path("/frm/"), + TypeMatcher.path("_frm_"), + TypeMatcher.img_src("_frm.svg"), + ) + case IliasElementType.FORUM_THREAD: + return TypeMatcher.never() + case IliasElementType.INFO_TAB: + return TypeMatcher.never() + case IliasElementType.LITERATURE_LIST: + return TypeMatcher.img_src("_bibl.svg") + case IliasElementType.LEARNING_MODULE: + return TypeMatcher.any( + TypeMatcher.path("/lm/"), + TypeMatcher.img_src("_lm.svg") + ) + case IliasElementType.LEARNING_MODULE_HTML: + return TypeMatcher.any( + TypeMatcher.query("baseclass=ilhtlmpresentationgui"), + TypeMatcher.img_src("_htlm.svg") + ) + case IliasElementType.LINK: + return TypeMatcher.any( + TypeMatcher.all( + TypeMatcher.query("baseclass=illinkresourcehandlergui"), + TypeMatcher.query("calldirectlink"), + ), + TypeMatcher.img_src("_webr.svg") + ) + case IliasElementType.MEDIA_POOL: + return TypeMatcher.any( + TypeMatcher.query("baseclass=ilmediapoolpresentationgui"), + TypeMatcher.img_src("_mep.svg") + ) + case IliasElementType.MEDIACAST_VIDEO: + return TypeMatcher.never() + case IliasElementType.MEDIACAST_VIDEO_FOLDER: + return TypeMatcher.any( + TypeMatcher.path("/mcst/"), + TypeMatcher.query("baseclass=ilmediacasthandlergui"), + TypeMatcher.img_src("_mcst.svg") + ) + case IliasElementType.MEETING: + return TypeMatcher.any( + TypeMatcher.img_src("_sess.svg") + ) + case IliasElementType.MOB_VIDEO: + return TypeMatcher.never() + case IliasElementType.OPENCAST_VIDEO: + return TypeMatcher.never() + case IliasElementType.OPENCAST_VIDEO_FOLDER: + return TypeMatcher.never() + case IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED: + return TypeMatcher.img_alt("opencast") + case IliasElementType.OPENCAST_VIDEO_PLAYER: + return TypeMatcher.never() + case IliasElementType.SCORM_LEARNING_MODULE: + return TypeMatcher.any( + TypeMatcher.query("baseclass=ilsahspresentationgui"), + TypeMatcher.img_src("_sahs.svg") + ) + case IliasElementType.SURVEY: + return TypeMatcher.any( + TypeMatcher.path("/svy/"), + TypeMatcher.img_src("svy.svg") + ) + case IliasElementType.TEST: + return TypeMatcher.any( + TypeMatcher.query("cmdclass=ilobjtestgui"), + TypeMatcher.query("cmdclass=iltestscreengui"), + TypeMatcher.img_src("_tst.svg") + ) + + raise CrawlWarning(f"Unknown matcher {self}") + @dataclass class IliasPageElement: @@ -50,11 +258,21 @@ class IliasPageElement: def id(self) -> str: regexes = [ r"eid=(?P[0-9a-z\-]+)", - r"file_(?P\d+)", - r"copa_(?P\d+)", - r"fold_(?P\d+)", - r"frm_(?P\d+)", - r"exc_(?P\d+)", + r"book/(?P\d+)", # booking + r"cat/(?P\d+)", + r"copa/(?P\d+)", # content page + r"crs/(?P\d+)", # course + r"exc/(?P\d+)", # exercise + r"file/(?P\d+)", # file + r"fold/(?P\d+)", # folder + r"frm/(?P\d+)", # forum + r"grp/(?P\d+)", # group + r"lm/(?P\d+)", # learning module + r"mcst/(?P\d+)", # mediacast + r"pg/(?P(\d|_)+)", # page? + r"svy/(?P\d+)", # survey + r"sess/(?P\d+)", # session + r"webr/(?P\d+)", # web referene (link) r"thr_pk=(?P\d+)", # forums r"ref_id=(?P\d+)", r"target=[a-z]+_(?P\d+)", @@ -139,18 +357,28 @@ class IliasLearningModulePage: previous_url: Optional[str] +class IliasSoup: + soup: BeautifulSoup + page_url: str + + def __init__(self, soup: BeautifulSoup, page_url: str): + self.soup = soup + self.page_url = page_url + + class IliasPage: - def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]): - self._soup = soup - self._page_url = _page_url + def __init__(self, ilias_soup: IliasSoup, source_element: Optional[IliasPageElement]): + self._ilias_soup = ilias_soup + self._soup = ilias_soup.soup + self._page_url = ilias_soup.page_url self._page_type = source_element.type if source_element else None self._source_name = source_element.name if source_element else "" @staticmethod - def is_root_page(soup: BeautifulSoup) -> bool: + def is_root_page(soup: IliasSoup) -> bool: if permalink := IliasPage.get_soup_permalink(soup): - return "goto.php?target=root_" in permalink + return "goto.php/root/" in permalink return False def get_child_elements(self) -> list[IliasPageElement]: @@ -193,7 +421,10 @@ class IliasPage: def get_description(self) -> Optional[BeautifulSoup]: def is_interesting_class(name: str) -> bool: - return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"] + return name in [ + "ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap", + "ilc_va_ihcap_AccordIHeadCap", "ilc_media_cont_MediaContainer" + ] paragraphs: list[Tag] = cast(list[Tag], self._soup.find_all(class_=is_interesting_class)) if not paragraphs: @@ -206,6 +437,21 @@ class IliasPage: for p in paragraphs: if p.find_parent(class_=is_interesting_class): continue + if "ilc_media_cont_MediaContainer" in p["class"]: + # We have an embedded video which should be downloaded by _find_mob_videos + if video := p.select_one("video"): + url, title = self._find_mob_video_url_title(video, p) + raw_html += '
' + if url is not None and urlparse(url).hostname != urlparse(self._page_url).hostname: + if url.startswith("//"): + url = "https:" + url + raw_html += f'External Video: {title}' + else: + raw_html += f"Video elided. Filename: '{title}'." + raw_html += "
\n" + continue # Ignore special listings (like folder groupings) if "ilc_section_Special" in p["class"]: @@ -244,79 +490,31 @@ class IliasPage: return url return None - def get_forum_entries(self) -> list[IliasPageElement]: - form = self._get_forum_form() - if not form: - return [] - threads = [] - - for row in cast(list[Tag], form.select("table > tbody > tr")): - url_tag = cast( - Optional[Tag], - row.find(name="a", attrs={"href": lambda x: x is not None and "cmd=viewthread" in x.lower()}) - ) - if url_tag is None: - log.explain(f"Skipping row without URL: {row}") - continue - name = url_tag.get_text().strip() - columns = [td.get_text().strip() for td in cast(list[Tag], row.find_all(name="td"))] - potential_dates_opt = [IliasPage._find_date_in_text(column) for column in columns] - potential_dates = [x for x in potential_dates_opt if x is not None] - mtime = max(potential_dates) if potential_dates else None - - threads.append(IliasPageElement.create_new( - IliasElementType.FORUM_THREAD, - self._abs_url_from_link(url_tag), - name, - mtime=mtime - )) - - return threads - - def get_download_forum_data(self, thread_ids: list[str]) -> Optional[IliasDownloadForumData]: - form = cast(Optional[Tag], self._soup.find( - "form", - attrs={"action": lambda x: x is not None and "fallbackCmd=showThreads" in x} - )) - if not form: + def get_forum_export_url(self) -> Optional[str]: + forum_link = self._soup.select_one("#tab_forums_threads > a") + if not forum_link: + log.explain("Found no forum link") return None - post_url = self._abs_url_from_relative(cast(str, form["action"])) - log.explain(f"Fetching forum threads {thread_ids}") + base_url = self._abs_url_from_link(forum_link) + base_url = re.sub(r"cmd=\w+", "cmd=post", base_url) + base_url = re.sub(r"cmdClass=\w+", "cmdClass=ilExportGUI", base_url) - form_data: Dict[str, Union[str, list[str]]] = { - "thread_ids[]": cast(list[str], thread_ids), - "selected_cmd2": "html", - "select_cmd2": "Ausführen", - "selected_cmd": "", - } + rtoken_form = cast( + Optional[Tag], + self._soup.find("form", attrs={"action": lambda x: x is not None and "rtoken=" in x}) + ) + if not rtoken_form: + log.explain("Found no rtoken anywhere") + return None + match = cast(re.Match[str], re.search(r"rtoken=(\w+)", str(rtoken_form.attrs["action"]))) + rtoken = match.group(1) - return IliasDownloadForumData(url=post_url, form_data=form_data, empty=len(thread_ids) == 0) + base_url = base_url + "&rtoken=" + rtoken - def _get_forum_form(self) -> Optional[Tag]: - return cast(Optional[Tag], self._soup.find( - "form", - attrs={"action": lambda x: x is not None and "fallbackCmd=showThreads" in x} - )) + return base_url def get_next_stage_element(self) -> Optional[IliasPageElement]: - if self._is_forum_page(): - if "trows=" in self._page_url: - log.explain("Manual row override detected, accepting it as good") - return None - log.explain("Requesting *all* forum threads") - thread_count = self._get_forum_thread_count() - if thread_count is not None and thread_count > 400: - log.warn( - "Forum has more than 400 threads, fetching all threads will take a while. " - "You might need to adjust your http_timeout config option." - ) - - # Fetch at least 400 in case we detect it wrong - if thread_count is not None and thread_count < 400: - thread_count = 400 - - return self._get_show_max_forum_entries_per_page_url(thread_count) if self._is_ilias_opencast_embedding(): log.explain("Unwrapping opencast embedding") return self.get_child_elements()[0] @@ -334,11 +532,6 @@ class IliasPage: log.explain("Crawling info tab, skipping content select") return None - def _is_forum_page(self) -> bool: - if perma_link := self.get_permalink(): - return "target=frm_" in perma_link - return False - def _is_video_player(self) -> bool: return "paella_config_file" in str(self._soup) @@ -378,7 +571,7 @@ class IliasPage: def _is_content_page(self) -> bool: if link := self.get_permalink(): - return "target=copa_" in link + return "/copa/" in link return False def _is_learning_module_page(self) -> bool: @@ -513,19 +706,17 @@ class IliasPage: # Configure button/link does not have anything interesting continue - type = self._find_type_from_link(name, link, url) - if not type: + typ = IliasPage._find_type_for_element( + name, url, lambda: IliasPage._find_icon_for_folder_entry(link) + ) + if not typ: _unexpected_html_warning() log.warn_contd(f"Could not extract type for {link}") continue - log.explain(f"Found {name!r}") + log.explain(f"Found {name!r} of type {typ}") - if type == IliasElementType.FILE and "_download" not in url: - url = re.sub(r"(target=file_\d+)", r"\1_download", url) - log.explain("Rewired file URL to include download part") - - items.append(IliasPageElement.create_new(type, url, name)) + items.append(IliasPageElement.create_new(typ, url, name)) return items @@ -786,15 +977,17 @@ class IliasPage: for link in links: abs_url = self._abs_url_from_link(link) # Make sure parents are sanitized. We do not want accidental parents - parents = [_sanitize_path_name(x) for x in self._find_upwards_folder_hierarchy(link)] + parents = [_sanitize_path_name(x) for x in IliasPage._find_upwards_folder_hierarchy(link)] if parents: element_name = "/".join(parents) + "/" + _sanitize_path_name(link.get_text()) else: element_name = _sanitize_path_name(link.get_text()) - element_type = self._find_type_from_link(element_name, link, abs_url) - description = self._find_link_description(link) + element_type = IliasPage._find_type_for_element( + element_name, abs_url, lambda: IliasPage._find_icon_for_folder_entry(link) + ) + description = IliasPage._find_link_description(link) # The last meeting on every page is expanded by default. # Its content is then shown inline *and* in the meeting page itself. @@ -805,10 +998,10 @@ class IliasPage: if not element_type: continue elif element_type == IliasElementType.FILE: - result.append(self._file_to_element(element_name, abs_url, link)) + result.append(IliasPage._file_to_element(element_name, abs_url, link)) continue - log.explain(f"Found {element_name!r}") + log.explain(f"Found {element_name!r} of type {element_type}") result.append(IliasPageElement.create_new( element_type, abs_url, @@ -826,71 +1019,92 @@ class IliasPage: def _find_mediacast_videos(self) -> list[IliasPageElement]: videos: list[IliasPageElement] = [] - for elem in cast(list[Tag], self._soup.select(".ilPlayerPreviewOverlayOuter")): - element_name = _sanitize_path_name( - cast(Tag, elem.select_one(".ilPlayerPreviewDescription")).get_text().strip() - ) - if not element_name.endswith(".mp4"): - # just to make sure it has some kinda-alrightish ending - element_name = element_name + ".mp4" - video_element = cast(Optional[Tag], elem.find(name="video")) - if not video_element: - _unexpected_html_warning() - log.warn_contd(f"No