mirror of
https://github.com/Garmelon/PFERD.git
synced 2025-07-12 22:22:30 +02:00
Support ILIAS 9
This commit is contained in:
@ -22,8 +22,14 @@ ambiguous situations.
|
|||||||
|
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- Support for ILIAS 9
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- Added prettier CSS to forum threads
|
- Added prettier CSS to forum threads
|
||||||
|
- Downloaded forum threads now link to the forum instead of the ILIAS thread
|
||||||
|
- Increase minimum supported Python version to 3.11
|
||||||
|
- Do not crawl nested courses (courses linked in other courses)
|
||||||
|
|
||||||
## Fixed
|
## Fixed
|
||||||
- File links in report on Windows
|
- File links in report on Windows
|
||||||
|
@ -22,7 +22,7 @@ from .async_helper import _iorepeat
|
|||||||
from .file_templates import Links, forum_thread_template, learning_module_template
|
from .file_templates import Links, forum_thread_template, learning_module_template
|
||||||
from .ilias_html_cleaner import clean, insert_base_markup
|
from .ilias_html_cleaner import clean, insert_base_markup
|
||||||
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
|
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
|
||||||
IliasPageElement, _sanitize_path_name, parse_ilias_forum_export)
|
IliasPageElement, IliasSoup, _sanitize_path_name, parse_ilias_forum_export)
|
||||||
from .shibboleth_login import ShibbolethLogin
|
from .shibboleth_login import ShibbolethLogin
|
||||||
|
|
||||||
TargetType = Union[str, int]
|
TargetType = Union[str, int]
|
||||||
@ -105,7 +105,6 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
|
|||||||
|
|
||||||
|
|
||||||
_DIRECTORY_PAGES: Set[IliasElementType] = {
|
_DIRECTORY_PAGES: Set[IliasElementType] = {
|
||||||
IliasElementType.COURSE,
|
|
||||||
IliasElementType.EXERCISE,
|
IliasElementType.EXERCISE,
|
||||||
IliasElementType.EXERCISE_FILES,
|
IliasElementType.EXERCISE_FILES,
|
||||||
IliasElementType.FOLDER,
|
IliasElementType.FOLDER,
|
||||||
@ -217,11 +216,19 @@ instance's greatest bottleneck.
|
|||||||
|
|
||||||
async def _crawl_desktop(self) -> None:
|
async def _crawl_desktop(self) -> None:
|
||||||
await self._crawl_url(
|
await self._crawl_url(
|
||||||
urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show")
|
urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show"),
|
||||||
|
crawl_nested_courses=True
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None:
|
async def _crawl_url(
|
||||||
if awaitable := await self._handle_ilias_page(url, None, PurePath("."), expected_id):
|
self,
|
||||||
|
url: str,
|
||||||
|
expected_id: Optional[int] = None,
|
||||||
|
crawl_nested_courses: bool = False
|
||||||
|
) -> None:
|
||||||
|
if awaitable := await self._handle_ilias_page(
|
||||||
|
url, None, PurePath("."), expected_id, crawl_nested_courses
|
||||||
|
):
|
||||||
await awaitable
|
await awaitable
|
||||||
|
|
||||||
async def _handle_ilias_page(
|
async def _handle_ilias_page(
|
||||||
@ -230,6 +237,7 @@ instance's greatest bottleneck.
|
|||||||
current_element: Optional[IliasPageElement],
|
current_element: Optional[IliasPageElement],
|
||||||
path: PurePath,
|
path: PurePath,
|
||||||
expected_course_id: Optional[int] = None,
|
expected_course_id: Optional[int] = None,
|
||||||
|
crawl_nested_courses: bool = False
|
||||||
) -> Optional[Coroutine[Any, Any, None]]:
|
) -> Optional[Coroutine[Any, Any, None]]:
|
||||||
maybe_cl = await self.crawl(path)
|
maybe_cl = await self.crawl(path)
|
||||||
if not maybe_cl:
|
if not maybe_cl:
|
||||||
@ -237,7 +245,9 @@ instance's greatest bottleneck.
|
|||||||
if current_element:
|
if current_element:
|
||||||
self._ensure_not_seen(current_element, path)
|
self._ensure_not_seen(current_element, path)
|
||||||
|
|
||||||
return self._crawl_ilias_page(url, current_element, maybe_cl, expected_course_id)
|
return self._crawl_ilias_page(
|
||||||
|
url, current_element, maybe_cl, expected_course_id, crawl_nested_courses
|
||||||
|
)
|
||||||
|
|
||||||
@anoncritical
|
@anoncritical
|
||||||
async def _crawl_ilias_page(
|
async def _crawl_ilias_page(
|
||||||
@ -246,6 +256,7 @@ instance's greatest bottleneck.
|
|||||||
current_element: Optional[IliasPageElement],
|
current_element: Optional[IliasPageElement],
|
||||||
cl: CrawlToken,
|
cl: CrawlToken,
|
||||||
expected_course_id: Optional[int] = None,
|
expected_course_id: Optional[int] = None,
|
||||||
|
crawl_nested_courses: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
elements: List[IliasPageElement] = []
|
elements: List[IliasPageElement] = []
|
||||||
# A list as variable redefinitions are not propagated to outer scopes
|
# A list as variable redefinitions are not propagated to outer scopes
|
||||||
@ -267,12 +278,12 @@ instance's greatest bottleneck.
|
|||||||
# If we expect to find a root course, enforce it
|
# If we expect to find a root course, enforce it
|
||||||
if current_parent is None and expected_course_id is not None:
|
if current_parent is None and expected_course_id is not None:
|
||||||
perma_link = IliasPage.get_soup_permalink(soup)
|
perma_link = IliasPage.get_soup_permalink(soup)
|
||||||
if not perma_link or "crs_" not in perma_link:
|
if not perma_link or "crs/" not in perma_link:
|
||||||
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
|
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
|
||||||
if str(expected_course_id) not in perma_link:
|
if str(expected_course_id) not in perma_link:
|
||||||
raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}")
|
raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}")
|
||||||
|
|
||||||
page = IliasPage(soup, next_stage_url, current_parent)
|
page = IliasPage(soup, current_parent)
|
||||||
if next_element := page.get_next_stage_element():
|
if next_element := page.get_next_stage_element():
|
||||||
current_parent = next_element
|
current_parent = next_element
|
||||||
next_stage_url = next_element.url
|
next_stage_url = next_element.url
|
||||||
@ -294,7 +305,7 @@ instance's greatest bottleneck.
|
|||||||
|
|
||||||
tasks: List[Awaitable[None]] = []
|
tasks: List[Awaitable[None]] = []
|
||||||
for element in elements:
|
for element in elements:
|
||||||
if handle := await self._handle_ilias_element(cl.path, element):
|
if handle := await self._handle_ilias_element(cl.path, element, crawl_nested_courses):
|
||||||
tasks.append(asyncio.create_task(handle))
|
tasks.append(asyncio.create_task(handle))
|
||||||
|
|
||||||
# And execute them
|
# And execute them
|
||||||
@ -310,6 +321,7 @@ instance's greatest bottleneck.
|
|||||||
self,
|
self,
|
||||||
parent_path: PurePath,
|
parent_path: PurePath,
|
||||||
element: IliasPageElement,
|
element: IliasPageElement,
|
||||||
|
crawl_nested_courses: bool = False
|
||||||
) -> Optional[Coroutine[Any, Any, None]]:
|
) -> Optional[Coroutine[Any, Any, None]]:
|
||||||
# element.name might contain `/` if the crawler created nested elements,
|
# element.name might contain `/` if the crawler created nested elements,
|
||||||
# so we can not sanitize it here. We trust in the output dir to thwart worst-case
|
# so we can not sanitize it here. We trust in the output dir to thwart worst-case
|
||||||
@ -362,6 +374,56 @@ instance's greatest bottleneck.
|
|||||||
"[bright_black](scorm learning modules are not supported)"
|
"[bright_black](scorm learning modules are not supported)"
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
elif element.type == IliasElementType.LITERATURE_LIST:
|
||||||
|
log.status(
|
||||||
|
"[bold bright_black]",
|
||||||
|
"Ignored",
|
||||||
|
fmt_path(element_path),
|
||||||
|
"[bright_black](literature lists are not currently supported)"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
elif element.type == IliasElementType.LEARNING_MODULE_HTML:
|
||||||
|
log.status(
|
||||||
|
"[bold bright_black]",
|
||||||
|
"Ignored",
|
||||||
|
fmt_path(element_path),
|
||||||
|
"[bright_black](HTML learning modules are not supported)"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
elif element.type == IliasElementType.BLOG:
|
||||||
|
log.status(
|
||||||
|
"[bold bright_black]",
|
||||||
|
"Ignored",
|
||||||
|
fmt_path(element_path),
|
||||||
|
"[bright_black](blogs are not currently supported)"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
elif element.type == IliasElementType.DCL_RECORD_LIST:
|
||||||
|
log.status(
|
||||||
|
"[bold bright_black]",
|
||||||
|
"Ignored",
|
||||||
|
fmt_path(element_path),
|
||||||
|
"[bright_black](dcl record lists are not currently supported)"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
elif element.type == IliasElementType.MEDIA_POOL:
|
||||||
|
log.status(
|
||||||
|
"[bold bright_black]",
|
||||||
|
"Ignored",
|
||||||
|
fmt_path(element_path),
|
||||||
|
"[bright_black](media pools are not currently supported)"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
elif element.type == IliasElementType.COURSE:
|
||||||
|
if crawl_nested_courses:
|
||||||
|
return await self._handle_ilias_page(element.url, element, element_path)
|
||||||
|
log.status(
|
||||||
|
"[bold bright_black]",
|
||||||
|
"Ignored",
|
||||||
|
fmt_path(element_path),
|
||||||
|
"[bright_black](not descending into linked course)"
|
||||||
|
)
|
||||||
|
return None
|
||||||
elif element.type == IliasElementType.LEARNING_MODULE:
|
elif element.type == IliasElementType.LEARNING_MODULE:
|
||||||
return await self._handle_learning_module(element, element_path)
|
return await self._handle_learning_module(element, element_path)
|
||||||
elif element.type == IliasElementType.LINK:
|
elif element.type == IliasElementType.LINK:
|
||||||
@ -590,7 +652,7 @@ instance's greatest bottleneck.
|
|||||||
)
|
)
|
||||||
|
|
||||||
async with dl as (bar, sink):
|
async with dl as (bar, sink):
|
||||||
page = IliasPage(await self._get_page(element.url), element.url, element)
|
page = IliasPage(await self._get_page(element.url), element)
|
||||||
stream_elements = page.get_child_elements()
|
stream_elements = page.get_child_elements()
|
||||||
|
|
||||||
if len(stream_elements) > 1:
|
if len(stream_elements) > 1:
|
||||||
@ -600,7 +662,7 @@ instance's greatest bottleneck.
|
|||||||
stream_element = stream_elements[0]
|
stream_element = stream_elements[0]
|
||||||
|
|
||||||
# We do not have a local cache yet
|
# We do not have a local cache yet
|
||||||
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
|
await self._stream_from_url(stream_element, sink, bar, is_video=True)
|
||||||
add_to_report([str(self._transformer.transform(dl.path))])
|
add_to_report([str(self._transformer.transform(dl.path))])
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -615,7 +677,7 @@ instance's greatest bottleneck.
|
|||||||
async with maybe_dl as (bar, sink):
|
async with maybe_dl as (bar, sink):
|
||||||
log.explain(f"Streaming video from real url {stream_element.url}")
|
log.explain(f"Streaming video from real url {stream_element.url}")
|
||||||
contained_video_paths.append(str(self._transformer.transform(maybe_dl.path)))
|
contained_video_paths.append(str(self._transformer.transform(maybe_dl.path)))
|
||||||
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
|
await self._stream_from_url(stream_element, sink, bar, is_video=True)
|
||||||
|
|
||||||
add_to_report(contained_video_paths)
|
add_to_report(contained_video_paths)
|
||||||
|
|
||||||
@ -637,12 +699,19 @@ instance's greatest bottleneck.
|
|||||||
async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None:
|
async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None:
|
||||||
assert dl # The function is only reached when dl is not None
|
assert dl # The function is only reached when dl is not None
|
||||||
async with dl as (bar, sink):
|
async with dl as (bar, sink):
|
||||||
await self._stream_from_url(element.url, sink, bar, is_video)
|
await self._stream_from_url(element, sink, bar, is_video)
|
||||||
|
|
||||||
|
async def _stream_from_url(
|
||||||
|
self,
|
||||||
|
element: IliasPageElement,
|
||||||
|
sink: FileSink,
|
||||||
|
bar: ProgressBar,
|
||||||
|
is_video: bool
|
||||||
|
) -> None:
|
||||||
|
url = element.url
|
||||||
|
|
||||||
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None:
|
|
||||||
async def try_stream() -> bool:
|
async def try_stream() -> bool:
|
||||||
next_url = url
|
next_url = url
|
||||||
|
|
||||||
# Normal files redirect to the magazine if we are not authenticated. As files could be HTML,
|
# Normal files redirect to the magazine if we are not authenticated. As files could be HTML,
|
||||||
# we can not match on the content type here. Instead, we disallow redirects and inspect the
|
# we can not match on the content type here. Instead, we disallow redirects and inspect the
|
||||||
# new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume
|
# new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume
|
||||||
@ -690,7 +759,7 @@ instance's greatest bottleneck.
|
|||||||
await self.authenticate(auth_id)
|
await self.authenticate(auth_id)
|
||||||
|
|
||||||
if not await try_stream():
|
if not await try_stream():
|
||||||
raise CrawlError("File streaming failed after authenticate()")
|
raise CrawlError(f"File streaming failed after authenticate() {element!r}")
|
||||||
|
|
||||||
async def _handle_forum(
|
async def _handle_forum(
|
||||||
self,
|
self,
|
||||||
@ -705,70 +774,23 @@ instance's greatest bottleneck.
|
|||||||
@_iorepeat(3, "crawling forum")
|
@_iorepeat(3, "crawling forum")
|
||||||
@anoncritical
|
@anoncritical
|
||||||
async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None:
|
async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None:
|
||||||
elements: List[IliasForumThread] = []
|
|
||||||
|
|
||||||
async with cl:
|
async with cl:
|
||||||
next_stage_url = element.url
|
inner = IliasPage(await self._get_page(element.url), element)
|
||||||
page = None
|
export_url = inner.get_forum_export_url()
|
||||||
|
if not export_url:
|
||||||
while next_stage_url:
|
log.warn("Could not extract forum export url")
|
||||||
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
|
|
||||||
log.explain(f"URL: {next_stage_url}")
|
|
||||||
|
|
||||||
soup = await self._get_page(next_stage_url)
|
|
||||||
page = IliasPage(soup, next_stage_url, element)
|
|
||||||
|
|
||||||
if next := page.get_next_stage_element():
|
|
||||||
next_stage_url = next.url
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
|
|
||||||
forum_threads: list[tuple[IliasPageElement, bool]] = []
|
|
||||||
for entry in cast(IliasPage, page).get_forum_entries():
|
|
||||||
path = cl.path / (_sanitize_path_name(entry.name) + ".html")
|
|
||||||
forum_threads.append((entry, self.should_try_download(path, mtime=entry.mtime)))
|
|
||||||
|
|
||||||
# Sort the ids. The forum download will *preserve* this ordering
|
|
||||||
forum_threads.sort(key=lambda elem: elem[0].id())
|
|
||||||
|
|
||||||
if not forum_threads:
|
|
||||||
log.explain("Forum had no threads")
|
|
||||||
return
|
return
|
||||||
|
|
||||||
download_data = cast(IliasPage, page).get_download_forum_data(
|
export = await self._post(export_url, {
|
||||||
[thread.id() for thread, download in forum_threads if download]
|
"format": "html",
|
||||||
)
|
"cmd[createExportFile]": ""
|
||||||
if not download_data:
|
})
|
||||||
raise CrawlWarning("Failed to extract forum data")
|
|
||||||
|
|
||||||
if not download_data.empty:
|
elements = parse_ilias_forum_export(soupify(export))
|
||||||
html = await self._post_authenticated(download_data.url, download_data.form_data)
|
|
||||||
elements = parse_ilias_forum_export(soupify(html))
|
|
||||||
else:
|
|
||||||
elements = []
|
|
||||||
|
|
||||||
# Verify that ILIAS does not change the order, as we depend on it later. Otherwise, we could not call
|
|
||||||
# download in the correct order, potentially messing up duplication handling.
|
|
||||||
expected_element_titles = [thread.name for thread, download in forum_threads if download]
|
|
||||||
actual_element_titles = [_sanitize_path_name(thread.name) for thread in elements]
|
|
||||||
if expected_element_titles != actual_element_titles:
|
|
||||||
raise CrawlWarning(
|
|
||||||
f"Forum thread order mismatch: {expected_element_titles} != {actual_element_titles}"
|
|
||||||
)
|
|
||||||
|
|
||||||
tasks: List[Awaitable[None]] = []
|
tasks: List[Awaitable[None]] = []
|
||||||
for thread, download in forum_threads:
|
for thread in elements:
|
||||||
if download:
|
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, element.url)))
|
||||||
# This only works because ILIAS keeps the order in the export
|
|
||||||
elem = elements.pop(0)
|
|
||||||
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, elem, thread)))
|
|
||||||
else:
|
|
||||||
# We only downloaded the threads we "should_try_download"ed. This can be an
|
|
||||||
# over-approximation and all will be fine.
|
|
||||||
# If we selected too few, e.g. because there was a duplicate title and the mtime of the
|
|
||||||
# original is newer than the update of the duplicate.
|
|
||||||
# This causes stale data locally, but I consider this problem acceptable right now.
|
|
||||||
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, thread)))
|
|
||||||
|
|
||||||
# And execute them
|
# And execute them
|
||||||
await self.gather(tasks)
|
await self.gather(tasks)
|
||||||
@ -779,7 +801,7 @@ instance's greatest bottleneck.
|
|||||||
self,
|
self,
|
||||||
parent_path: PurePath,
|
parent_path: PurePath,
|
||||||
thread: Union[IliasForumThread, IliasPageElement],
|
thread: Union[IliasForumThread, IliasPageElement],
|
||||||
element: IliasPageElement
|
forum_url: str
|
||||||
) -> None:
|
) -> None:
|
||||||
path = parent_path / (_sanitize_path_name(thread.name) + ".html")
|
path = parent_path / (_sanitize_path_name(thread.name) + ".html")
|
||||||
maybe_dl = await self.download(path, mtime=thread.mtime)
|
maybe_dl = await self.download(path, mtime=thread.mtime)
|
||||||
@ -789,7 +811,7 @@ instance's greatest bottleneck.
|
|||||||
async with maybe_dl as (bar, sink):
|
async with maybe_dl as (bar, sink):
|
||||||
rendered = forum_thread_template(
|
rendered = forum_thread_template(
|
||||||
thread.name,
|
thread.name,
|
||||||
element.url,
|
forum_url,
|
||||||
thread.name_tag,
|
thread.name_tag,
|
||||||
await self.internalize_images(thread.content_tag)
|
await self.internalize_images(thread.content_tag)
|
||||||
)
|
)
|
||||||
@ -817,7 +839,7 @@ instance's greatest bottleneck.
|
|||||||
log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}")
|
log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}")
|
||||||
log.explain(f"URL: {element.url}")
|
log.explain(f"URL: {element.url}")
|
||||||
soup = await self._get_page(element.url)
|
soup = await self._get_page(element.url)
|
||||||
page = IliasPage(soup, element.url, element)
|
page = IliasPage(soup, element)
|
||||||
if next := page.get_learning_module_data():
|
if next := page.get_learning_module_data():
|
||||||
elements.extend(await self._crawl_learning_module_direction(
|
elements.extend(await self._crawl_learning_module_direction(
|
||||||
cl.path, next.previous_url, "left", element
|
cl.path, next.previous_url, "left", element
|
||||||
@ -860,7 +882,7 @@ instance's greatest bottleneck.
|
|||||||
log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})")
|
log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})")
|
||||||
log.explain(f"URL: {next_element_url}")
|
log.explain(f"URL: {next_element_url}")
|
||||||
soup = await self._get_page(next_element_url)
|
soup = await self._get_page(next_element_url)
|
||||||
page = IliasPage(soup, next_element_url, parent_element)
|
page = IliasPage(soup, parent_element)
|
||||||
if next := page.get_learning_module_data():
|
if next := page.get_learning_module_data():
|
||||||
elements.append(next)
|
elements.append(next)
|
||||||
if dir == "left":
|
if dir == "left":
|
||||||
@ -891,13 +913,13 @@ instance's greatest bottleneck.
|
|||||||
if prev:
|
if prev:
|
||||||
prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html"))
|
prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html"))
|
||||||
if prev_p:
|
if prev_p:
|
||||||
prev = os.path.relpath(prev_p, my_path.parent)
|
prev = cast(str, os.path.relpath(prev_p, my_path.parent))
|
||||||
else:
|
else:
|
||||||
prev = None
|
prev = None
|
||||||
if next:
|
if next:
|
||||||
next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html"))
|
next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html"))
|
||||||
if next_p:
|
if next_p:
|
||||||
next = os.path.relpath(next_p, my_path.parent)
|
next = cast(str, os.path.relpath(next_p, my_path.parent))
|
||||||
else:
|
else:
|
||||||
next = None
|
next = None
|
||||||
|
|
||||||
@ -937,10 +959,10 @@ instance's greatest bottleneck.
|
|||||||
)
|
)
|
||||||
self._visited_urls[element.url] = parent_path
|
self._visited_urls[element.url] = parent_path
|
||||||
|
|
||||||
async def _get_page(self, url: str, root_page_allowed: bool = False) -> BeautifulSoup:
|
async def _get_page(self, url: str, root_page_allowed: bool = False) -> IliasSoup:
|
||||||
auth_id = await self._current_auth_id()
|
auth_id = await self._current_auth_id()
|
||||||
async with self.session.get(url) as request:
|
async with self.session.get(url) as request:
|
||||||
soup = soupify(await request.read())
|
soup = IliasSoup(soupify(await request.read()), str(request.url))
|
||||||
if IliasPage.is_logged_in(soup):
|
if IliasPage.is_logged_in(soup):
|
||||||
return self._verify_page(soup, url, root_page_allowed)
|
return self._verify_page(soup, url, root_page_allowed)
|
||||||
|
|
||||||
@ -949,13 +971,13 @@ instance's greatest bottleneck.
|
|||||||
|
|
||||||
# Retry once after authenticating. If this fails, we will die.
|
# Retry once after authenticating. If this fails, we will die.
|
||||||
async with self.session.get(url) as request:
|
async with self.session.get(url) as request:
|
||||||
soup = soupify(await request.read())
|
soup = IliasSoup(soupify(await request.read()), str(request.url))
|
||||||
if IliasPage.is_logged_in(soup):
|
if IliasPage.is_logged_in(soup):
|
||||||
return self._verify_page(soup, url, root_page_allowed)
|
return self._verify_page(soup, url, root_page_allowed)
|
||||||
raise CrawlError(f"get_page failed even after authenticating on {url!r}")
|
raise CrawlError(f"get_page failed even after authenticating on {url!r}")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _verify_page(soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup:
|
def _verify_page(soup: IliasSoup, url: str, root_page_allowed: bool) -> IliasSoup:
|
||||||
if IliasPage.is_root_page(soup) and not root_page_allowed:
|
if IliasPage.is_root_page(soup) and not root_page_allowed:
|
||||||
raise CrawlError(
|
raise CrawlError(
|
||||||
"Unexpectedly encountered ILIAS root page. "
|
"Unexpectedly encountered ILIAS root page. "
|
||||||
@ -967,29 +989,19 @@ instance's greatest bottleneck.
|
|||||||
)
|
)
|
||||||
return soup
|
return soup
|
||||||
|
|
||||||
async def _post_authenticated(
|
async def _post(
|
||||||
self,
|
self,
|
||||||
url: str,
|
url: str,
|
||||||
data: dict[str, Union[str, List[str]]]
|
data: dict[str, Union[str, List[str]]]
|
||||||
) -> bytes:
|
) -> bytes:
|
||||||
auth_id = await self._current_auth_id()
|
|
||||||
|
|
||||||
form_data = aiohttp.FormData()
|
form_data = aiohttp.FormData()
|
||||||
for key, val in data.items():
|
for key, val in data.items():
|
||||||
form_data.add_field(key, val)
|
form_data.add_field(key, val)
|
||||||
|
|
||||||
async with self.session.post(url, data=form_data(), allow_redirects=False) as request:
|
async with self.session.post(url, data=form_data()) as request:
|
||||||
if request.status == 200:
|
if request.status == 200:
|
||||||
return await request.read()
|
return await request.read()
|
||||||
|
raise CrawlError(f"post failed with status {request.status}")
|
||||||
# We weren't authenticated, so try to do that
|
|
||||||
await self.authenticate(auth_id)
|
|
||||||
|
|
||||||
# Retry once after authenticating. If this fails, we will die.
|
|
||||||
async with self.session.post(url, data=data, allow_redirects=False) as request:
|
|
||||||
if request.status == 200:
|
|
||||||
return await request.read()
|
|
||||||
raise CrawlError("post_authenticated failed even after authenticating")
|
|
||||||
|
|
||||||
async def _get_authenticated(self, url: str) -> bytes:
|
async def _get_authenticated(self, url: str) -> bytes:
|
||||||
auth_id = await self._current_auth_id()
|
auth_id = await self._current_auth_id()
|
||||||
@ -1037,34 +1049,6 @@ instance's greatest bottleneck.
|
|||||||
|
|
||||||
# do the actual login
|
# do the actual login
|
||||||
async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request:
|
async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request:
|
||||||
soup = soupify(await request.read())
|
soup = IliasSoup(soupify(await request.read()), str(request.url))
|
||||||
if not self._is_logged_in(soup):
|
if not IliasPage.is_logged_in(soup):
|
||||||
self._auth.invalidate_credentials()
|
self._auth.invalidate_credentials()
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _is_logged_in(soup: BeautifulSoup) -> bool:
|
|
||||||
# Normal ILIAS pages
|
|
||||||
mainbar = cast(Optional[Tag], soup.find(class_="il-maincontrols-metabar"))
|
|
||||||
if mainbar is not None:
|
|
||||||
login_button = mainbar.find(attrs={"href": lambda x: x is not None and "login.php" in x})
|
|
||||||
shib_login = soup.find(id="button_shib_login")
|
|
||||||
return not login_button and not shib_login
|
|
||||||
|
|
||||||
# Personal Desktop
|
|
||||||
if soup.find("a", attrs={"href": lambda x: x is not None and "block_type=pditems" in x}):
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Video listing embeds do not have complete ILIAS html. Try to match them by
|
|
||||||
# their video listing table
|
|
||||||
video_table = soup.find(
|
|
||||||
recursive=True,
|
|
||||||
name="table",
|
|
||||||
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
|
|
||||||
)
|
|
||||||
if video_table is not None:
|
|
||||||
return True
|
|
||||||
# The individual video player wrapper page has nothing of the above.
|
|
||||||
# Match it by its playerContainer.
|
|
||||||
if soup.select_one("#playerContainer") is not None:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
@ -3,20 +3,100 @@ import re
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import date, datetime, timedelta
|
from datetime import date, datetime, timedelta
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import Dict, Optional, Union, cast
|
from typing import Callable, Dict, Optional, Union, cast
|
||||||
from urllib.parse import urljoin, urlparse
|
from urllib.parse import urljoin, urlparse
|
||||||
|
|
||||||
from bs4 import BeautifulSoup, Tag
|
from bs4 import BeautifulSoup, Tag
|
||||||
|
|
||||||
|
from PFERD.crawl import CrawlError
|
||||||
|
from PFERD.crawl.crawler import CrawlWarning
|
||||||
from PFERD.logging import log
|
from PFERD.logging import log
|
||||||
from PFERD.utils import url_set_query_params
|
from PFERD.utils import url_set_query_params
|
||||||
|
|
||||||
TargetType = Union[str, int]
|
TargetType = Union[str, int]
|
||||||
|
|
||||||
|
|
||||||
|
class TypeMatcher:
|
||||||
|
class UrlPath:
|
||||||
|
path: str
|
||||||
|
|
||||||
|
def __init__(self, path: str):
|
||||||
|
self.path = path
|
||||||
|
|
||||||
|
class UrlParameter:
|
||||||
|
query: str
|
||||||
|
|
||||||
|
def __init__(self, query: str):
|
||||||
|
self.query = query
|
||||||
|
|
||||||
|
class ImgSrc:
|
||||||
|
src: str
|
||||||
|
|
||||||
|
def __init__(self, src: str):
|
||||||
|
self.src = src
|
||||||
|
|
||||||
|
class ImgAlt:
|
||||||
|
alt: str
|
||||||
|
|
||||||
|
def __init__(self, alt: str):
|
||||||
|
self.alt = alt
|
||||||
|
|
||||||
|
class All:
|
||||||
|
matchers: list['IliasElementMatcher']
|
||||||
|
|
||||||
|
def __init__(self, matchers: list['IliasElementMatcher']):
|
||||||
|
self.matchers = matchers
|
||||||
|
|
||||||
|
class Any:
|
||||||
|
matchers: list['IliasElementMatcher']
|
||||||
|
|
||||||
|
def __init__(self, matchers: list['IliasElementMatcher']):
|
||||||
|
self.matchers = matchers
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def path(path: str) -> UrlPath:
|
||||||
|
return TypeMatcher.UrlPath(path)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def query(query: str) -> UrlParameter:
|
||||||
|
return TypeMatcher.UrlParameter(query)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def img_src(src: str) -> ImgSrc:
|
||||||
|
return TypeMatcher.ImgSrc(src)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def img_alt(alt: str) -> ImgAlt:
|
||||||
|
return TypeMatcher.ImgAlt(alt)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def all(*matchers: 'IliasElementMatcher') -> All:
|
||||||
|
return TypeMatcher.All(list(matchers))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def any(*matchers: 'IliasElementMatcher') -> Any:
|
||||||
|
return TypeMatcher.Any(list(matchers))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def never() -> Any:
|
||||||
|
return TypeMatcher.Any([])
|
||||||
|
|
||||||
|
|
||||||
|
IliasElementMatcher = (
|
||||||
|
TypeMatcher.UrlPath
|
||||||
|
| TypeMatcher.UrlParameter
|
||||||
|
| TypeMatcher.ImgSrc
|
||||||
|
| TypeMatcher.ImgAlt
|
||||||
|
| TypeMatcher.All
|
||||||
|
| TypeMatcher.Any
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class IliasElementType(Enum):
|
class IliasElementType(Enum):
|
||||||
|
BLOG = "blog"
|
||||||
BOOKING = "booking"
|
BOOKING = "booking"
|
||||||
COURSE = "course"
|
COURSE = "course"
|
||||||
|
DCL_RECORD_LIST = "dcl_record_list"
|
||||||
EXERCISE = "exercise"
|
EXERCISE = "exercise"
|
||||||
EXERCISE_FILES = "exercise_files" # own submitted files
|
EXERCISE_FILES = "exercise_files" # own submitted files
|
||||||
FILE = "file"
|
FILE = "file"
|
||||||
@ -25,7 +105,10 @@ class IliasElementType(Enum):
|
|||||||
FORUM_THREAD = "forum_thread"
|
FORUM_THREAD = "forum_thread"
|
||||||
INFO_TAB = "info_tab"
|
INFO_TAB = "info_tab"
|
||||||
LEARNING_MODULE = "learning_module"
|
LEARNING_MODULE = "learning_module"
|
||||||
|
LEARNING_MODULE_HTML = "learning_module_html"
|
||||||
|
LITERATURE_LIST = "literature_list"
|
||||||
LINK = "link"
|
LINK = "link"
|
||||||
|
MEDIA_POOL = "media_pool"
|
||||||
MEDIACAST_VIDEO = "mediacast_video"
|
MEDIACAST_VIDEO = "mediacast_video"
|
||||||
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder"
|
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder"
|
||||||
MEETING = "meeting"
|
MEETING = "meeting"
|
||||||
@ -38,6 +121,131 @@ class IliasElementType(Enum):
|
|||||||
SURVEY = "survey"
|
SURVEY = "survey"
|
||||||
TEST = "test" # an online test. Will be ignored currently.
|
TEST = "test" # an online test. Will be ignored currently.
|
||||||
|
|
||||||
|
def matcher(self) -> IliasElementMatcher:
|
||||||
|
match self:
|
||||||
|
case IliasElementType.BLOG:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.img_src("_blog.svg")
|
||||||
|
)
|
||||||
|
case IliasElementType.BOOKING:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.path("/book/"),
|
||||||
|
TypeMatcher.img_src("_book.svg")
|
||||||
|
)
|
||||||
|
case IliasElementType.COURSE:
|
||||||
|
return TypeMatcher.any(TypeMatcher.path("/crs/"), TypeMatcher.img_src("_crsr.svg"))
|
||||||
|
case IliasElementType.DCL_RECORD_LIST:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.img_src("_dcl.svg"),
|
||||||
|
TypeMatcher.query("cmdclass=ildclrecordlistgui")
|
||||||
|
)
|
||||||
|
case IliasElementType.EXERCISE:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.path("/exc/"),
|
||||||
|
TypeMatcher.path("_exc_"),
|
||||||
|
TypeMatcher.img_src("_exc.svg"),
|
||||||
|
)
|
||||||
|
case IliasElementType.EXERCISE_FILES:
|
||||||
|
return TypeMatcher.never()
|
||||||
|
case IliasElementType.FILE:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.query("cmd=sendfile"),
|
||||||
|
TypeMatcher.path("_file_"),
|
||||||
|
TypeMatcher.img_src("/filedelivery/"),
|
||||||
|
)
|
||||||
|
case IliasElementType.FOLDER:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.path("/fold/"),
|
||||||
|
TypeMatcher.img_src("_fold.svg"),
|
||||||
|
|
||||||
|
TypeMatcher.path("/grp/"),
|
||||||
|
TypeMatcher.img_src("_grp.svg"),
|
||||||
|
|
||||||
|
TypeMatcher.path("/copa/"),
|
||||||
|
TypeMatcher.path("_copa_"),
|
||||||
|
TypeMatcher.img_src("_copa.svg"),
|
||||||
|
|
||||||
|
# Not supported right now but warn users
|
||||||
|
# TypeMatcher.query("baseclass=ilmediapoolpresentationgui"),
|
||||||
|
# TypeMatcher.img_alt("medienpool"),
|
||||||
|
# TypeMatcher.img_src("_mep.svg"),
|
||||||
|
)
|
||||||
|
case IliasElementType.FORUM:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.path("/frm/"),
|
||||||
|
TypeMatcher.path("_frm_"),
|
||||||
|
TypeMatcher.img_src("_frm.svg"),
|
||||||
|
)
|
||||||
|
case IliasElementType.FORUM_THREAD:
|
||||||
|
return TypeMatcher.never()
|
||||||
|
case IliasElementType.INFO_TAB:
|
||||||
|
return TypeMatcher.never()
|
||||||
|
case IliasElementType.LITERATURE_LIST:
|
||||||
|
return TypeMatcher.img_src("_bibl.svg")
|
||||||
|
case IliasElementType.LEARNING_MODULE:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.path("/lm/"),
|
||||||
|
TypeMatcher.img_src("_lm.svg")
|
||||||
|
)
|
||||||
|
case IliasElementType.LEARNING_MODULE_HTML:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.query("baseclass=ilhtlmpresentationgui"),
|
||||||
|
TypeMatcher.img_src("_htlm.svg")
|
||||||
|
)
|
||||||
|
case IliasElementType.LINK:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.all(
|
||||||
|
TypeMatcher.query("baseclass=illinkresourcehandlergui"),
|
||||||
|
TypeMatcher.query("calldirectlink"),
|
||||||
|
),
|
||||||
|
TypeMatcher.img_src("_webr.svg")
|
||||||
|
)
|
||||||
|
case IliasElementType.MEDIA_POOL:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.query("baseclass=ilmediapoolpresentationgui"),
|
||||||
|
TypeMatcher.img_src("_mep.svg")
|
||||||
|
)
|
||||||
|
case IliasElementType.MEDIACAST_VIDEO:
|
||||||
|
return TypeMatcher.never()
|
||||||
|
case IliasElementType.MEDIACAST_VIDEO_FOLDER:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.path("/mcst/"),
|
||||||
|
TypeMatcher.query("baseclass=ilmediacasthandlergui"),
|
||||||
|
TypeMatcher.img_src("_mcst.svg")
|
||||||
|
)
|
||||||
|
case IliasElementType.MEETING:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.img_src("_sess.svg")
|
||||||
|
)
|
||||||
|
case IliasElementType.MOB_VIDEO:
|
||||||
|
return TypeMatcher.never()
|
||||||
|
case IliasElementType.OPENCAST_VIDEO:
|
||||||
|
return TypeMatcher.never()
|
||||||
|
case IliasElementType.OPENCAST_VIDEO_FOLDER:
|
||||||
|
return TypeMatcher.never()
|
||||||
|
case IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED:
|
||||||
|
return TypeMatcher.img_alt("opencast")
|
||||||
|
case IliasElementType.OPENCAST_VIDEO_PLAYER:
|
||||||
|
return TypeMatcher.never()
|
||||||
|
case IliasElementType.SCORM_LEARNING_MODULE:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.query("baseclass=ilsahspresentationgui"),
|
||||||
|
TypeMatcher.img_src("_sahs.svg")
|
||||||
|
)
|
||||||
|
case IliasElementType.SURVEY:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.path("/svy/"),
|
||||||
|
TypeMatcher.img_src("svy.svg")
|
||||||
|
)
|
||||||
|
case IliasElementType.TEST:
|
||||||
|
return TypeMatcher.any(
|
||||||
|
TypeMatcher.query("cmdclass=ilobjtestgui"),
|
||||||
|
TypeMatcher.query("cmdclass=iltestscreengui"),
|
||||||
|
TypeMatcher.img_src("_tst.svg")
|
||||||
|
)
|
||||||
|
|
||||||
|
raise CrawlWarning(f"Unknown matcher {self}")
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class IliasPageElement:
|
class IliasPageElement:
|
||||||
@ -50,11 +258,21 @@ class IliasPageElement:
|
|||||||
def id(self) -> str:
|
def id(self) -> str:
|
||||||
regexes = [
|
regexes = [
|
||||||
r"eid=(?P<id>[0-9a-z\-]+)",
|
r"eid=(?P<id>[0-9a-z\-]+)",
|
||||||
r"file_(?P<id>\d+)",
|
r"book/(?P<id>\d+)", # booking
|
||||||
r"copa_(?P<id>\d+)",
|
r"cat/(?P<id>\d+)",
|
||||||
r"fold_(?P<id>\d+)",
|
r"copa/(?P<id>\d+)", # content page
|
||||||
r"frm_(?P<id>\d+)",
|
r"crs/(?P<id>\d+)", # course
|
||||||
r"exc_(?P<id>\d+)",
|
r"exc/(?P<id>\d+)", # exercise
|
||||||
|
r"file/(?P<id>\d+)", # file
|
||||||
|
r"fold/(?P<id>\d+)", # folder
|
||||||
|
r"frm/(?P<id>\d+)", # forum
|
||||||
|
r"grp/(?P<id>\d+)", # group
|
||||||
|
r"lm/(?P<id>\d+)", # learning module
|
||||||
|
r"mcst/(?P<id>\d+)", # mediacast
|
||||||
|
r"pg/(?P<id>(\d|_)+)", # page?
|
||||||
|
r"svy/(?P<id>\d+)", # survey
|
||||||
|
r"sess/(?P<id>\d+)", # session
|
||||||
|
r"webr/(?P<id>\d+)", # web referene (link)
|
||||||
r"thr_pk=(?P<id>\d+)", # forums
|
r"thr_pk=(?P<id>\d+)", # forums
|
||||||
r"ref_id=(?P<id>\d+)",
|
r"ref_id=(?P<id>\d+)",
|
||||||
r"target=[a-z]+_(?P<id>\d+)",
|
r"target=[a-z]+_(?P<id>\d+)",
|
||||||
@ -139,18 +357,28 @@ class IliasLearningModulePage:
|
|||||||
previous_url: Optional[str]
|
previous_url: Optional[str]
|
||||||
|
|
||||||
|
|
||||||
|
class IliasSoup:
|
||||||
|
soup: BeautifulSoup
|
||||||
|
page_url: str
|
||||||
|
|
||||||
|
def __init__(self, soup: BeautifulSoup, page_url: str):
|
||||||
|
self.soup = soup
|
||||||
|
self.page_url = page_url
|
||||||
|
|
||||||
|
|
||||||
class IliasPage:
|
class IliasPage:
|
||||||
|
|
||||||
def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]):
|
def __init__(self, ilias_soup: IliasSoup, source_element: Optional[IliasPageElement]):
|
||||||
self._soup = soup
|
self._ilias_soup = ilias_soup
|
||||||
self._page_url = _page_url
|
self._soup = ilias_soup.soup
|
||||||
|
self._page_url = ilias_soup.page_url
|
||||||
self._page_type = source_element.type if source_element else None
|
self._page_type = source_element.type if source_element else None
|
||||||
self._source_name = source_element.name if source_element else ""
|
self._source_name = source_element.name if source_element else ""
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def is_root_page(soup: BeautifulSoup) -> bool:
|
def is_root_page(soup: IliasSoup) -> bool:
|
||||||
if permalink := IliasPage.get_soup_permalink(soup):
|
if permalink := IliasPage.get_soup_permalink(soup):
|
||||||
return "goto.php?target=root_" in permalink
|
return "goto.php/root/" in permalink
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_child_elements(self) -> list[IliasPageElement]:
|
def get_child_elements(self) -> list[IliasPageElement]:
|
||||||
@ -193,7 +421,10 @@ class IliasPage:
|
|||||||
|
|
||||||
def get_description(self) -> Optional[BeautifulSoup]:
|
def get_description(self) -> Optional[BeautifulSoup]:
|
||||||
def is_interesting_class(name: str) -> bool:
|
def is_interesting_class(name: str) -> bool:
|
||||||
return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"]
|
return name in [
|
||||||
|
"ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap",
|
||||||
|
"ilc_va_ihcap_AccordIHeadCap", "ilc_media_cont_MediaContainer"
|
||||||
|
]
|
||||||
|
|
||||||
paragraphs: list[Tag] = cast(list[Tag], self._soup.find_all(class_=is_interesting_class))
|
paragraphs: list[Tag] = cast(list[Tag], self._soup.find_all(class_=is_interesting_class))
|
||||||
if not paragraphs:
|
if not paragraphs:
|
||||||
@ -206,6 +437,21 @@ class IliasPage:
|
|||||||
for p in paragraphs:
|
for p in paragraphs:
|
||||||
if p.find_parent(class_=is_interesting_class):
|
if p.find_parent(class_=is_interesting_class):
|
||||||
continue
|
continue
|
||||||
|
if "ilc_media_cont_MediaContainer" in p["class"]:
|
||||||
|
# We have an embedded video which should be downloaded by _find_mob_videos
|
||||||
|
if video := p.select_one("video"):
|
||||||
|
url, title = self._find_mob_video_url_title(video, p)
|
||||||
|
raw_html += '<div style="min-width: 100px; min-height: 100px; border: 1px solid black;'
|
||||||
|
raw_html += 'display: flex; justify-content: center; align-items: center;'
|
||||||
|
raw_html += ' margin: 0.5rem;">'
|
||||||
|
if url is not None and urlparse(url).hostname != urlparse(self._page_url).hostname:
|
||||||
|
if url.startswith("//"):
|
||||||
|
url = "https:" + url
|
||||||
|
raw_html += f'<a href="{url}" target="_blank">External Video: {title}</a>'
|
||||||
|
else:
|
||||||
|
raw_html += f"Video elided. Filename: '{title}'."
|
||||||
|
raw_html += "</div>\n"
|
||||||
|
continue
|
||||||
|
|
||||||
# Ignore special listings (like folder groupings)
|
# Ignore special listings (like folder groupings)
|
||||||
if "ilc_section_Special" in p["class"]:
|
if "ilc_section_Special" in p["class"]:
|
||||||
@ -244,79 +490,31 @@ class IliasPage:
|
|||||||
return url
|
return url
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_forum_entries(self) -> list[IliasPageElement]:
|
def get_forum_export_url(self) -> Optional[str]:
|
||||||
form = self._get_forum_form()
|
forum_link = self._soup.select_one("#tab_forums_threads > a")
|
||||||
if not form:
|
if not forum_link:
|
||||||
return []
|
log.explain("Found no forum link")
|
||||||
threads = []
|
|
||||||
|
|
||||||
for row in cast(list[Tag], form.select("table > tbody > tr")):
|
|
||||||
url_tag = cast(
|
|
||||||
Optional[Tag],
|
|
||||||
row.find(name="a", attrs={"href": lambda x: x is not None and "cmd=viewthread" in x.lower()})
|
|
||||||
)
|
|
||||||
if url_tag is None:
|
|
||||||
log.explain(f"Skipping row without URL: {row}")
|
|
||||||
continue
|
|
||||||
name = url_tag.get_text().strip()
|
|
||||||
columns = [td.get_text().strip() for td in cast(list[Tag], row.find_all(name="td"))]
|
|
||||||
potential_dates_opt = [IliasPage._find_date_in_text(column) for column in columns]
|
|
||||||
potential_dates = [x for x in potential_dates_opt if x is not None]
|
|
||||||
mtime = max(potential_dates) if potential_dates else None
|
|
||||||
|
|
||||||
threads.append(IliasPageElement.create_new(
|
|
||||||
IliasElementType.FORUM_THREAD,
|
|
||||||
self._abs_url_from_link(url_tag),
|
|
||||||
name,
|
|
||||||
mtime=mtime
|
|
||||||
))
|
|
||||||
|
|
||||||
return threads
|
|
||||||
|
|
||||||
def get_download_forum_data(self, thread_ids: list[str]) -> Optional[IliasDownloadForumData]:
|
|
||||||
form = cast(Optional[Tag], self._soup.find(
|
|
||||||
"form",
|
|
||||||
attrs={"action": lambda x: x is not None and "fallbackCmd=showThreads" in x}
|
|
||||||
))
|
|
||||||
if not form:
|
|
||||||
return None
|
return None
|
||||||
post_url = self._abs_url_from_relative(cast(str, form["action"]))
|
|
||||||
|
|
||||||
log.explain(f"Fetching forum threads {thread_ids}")
|
base_url = self._abs_url_from_link(forum_link)
|
||||||
|
base_url = re.sub(r"cmd=\w+", "cmd=post", base_url)
|
||||||
|
base_url = re.sub(r"cmdClass=\w+", "cmdClass=ilExportGUI", base_url)
|
||||||
|
|
||||||
form_data: Dict[str, Union[str, list[str]]] = {
|
rtoken_form = cast(
|
||||||
"thread_ids[]": cast(list[str], thread_ids),
|
Optional[Tag],
|
||||||
"selected_cmd2": "html",
|
self._soup.find("form", attrs={"action": lambda x: x is not None and "rtoken=" in x})
|
||||||
"select_cmd2": "Ausführen",
|
)
|
||||||
"selected_cmd": "",
|
if not rtoken_form:
|
||||||
}
|
log.explain("Found no rtoken anywhere")
|
||||||
|
return None
|
||||||
|
match = cast(re.Match[str], re.search(r"rtoken=(\w+)", str(rtoken_form.attrs["action"])))
|
||||||
|
rtoken = match.group(1)
|
||||||
|
|
||||||
return IliasDownloadForumData(url=post_url, form_data=form_data, empty=len(thread_ids) == 0)
|
base_url = base_url + "&rtoken=" + rtoken
|
||||||
|
|
||||||
def _get_forum_form(self) -> Optional[Tag]:
|
return base_url
|
||||||
return cast(Optional[Tag], self._soup.find(
|
|
||||||
"form",
|
|
||||||
attrs={"action": lambda x: x is not None and "fallbackCmd=showThreads" in x}
|
|
||||||
))
|
|
||||||
|
|
||||||
def get_next_stage_element(self) -> Optional[IliasPageElement]:
|
def get_next_stage_element(self) -> Optional[IliasPageElement]:
|
||||||
if self._is_forum_page():
|
|
||||||
if "trows=" in self._page_url:
|
|
||||||
log.explain("Manual row override detected, accepting it as good")
|
|
||||||
return None
|
|
||||||
log.explain("Requesting *all* forum threads")
|
|
||||||
thread_count = self._get_forum_thread_count()
|
|
||||||
if thread_count is not None and thread_count > 400:
|
|
||||||
log.warn(
|
|
||||||
"Forum has more than 400 threads, fetching all threads will take a while. "
|
|
||||||
"You might need to adjust your http_timeout config option."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Fetch at least 400 in case we detect it wrong
|
|
||||||
if thread_count is not None and thread_count < 400:
|
|
||||||
thread_count = 400
|
|
||||||
|
|
||||||
return self._get_show_max_forum_entries_per_page_url(thread_count)
|
|
||||||
if self._is_ilias_opencast_embedding():
|
if self._is_ilias_opencast_embedding():
|
||||||
log.explain("Unwrapping opencast embedding")
|
log.explain("Unwrapping opencast embedding")
|
||||||
return self.get_child_elements()[0]
|
return self.get_child_elements()[0]
|
||||||
@ -334,11 +532,6 @@ class IliasPage:
|
|||||||
log.explain("Crawling info tab, skipping content select")
|
log.explain("Crawling info tab, skipping content select")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _is_forum_page(self) -> bool:
|
|
||||||
if perma_link := self.get_permalink():
|
|
||||||
return "target=frm_" in perma_link
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _is_video_player(self) -> bool:
|
def _is_video_player(self) -> bool:
|
||||||
return "paella_config_file" in str(self._soup)
|
return "paella_config_file" in str(self._soup)
|
||||||
|
|
||||||
@ -378,7 +571,7 @@ class IliasPage:
|
|||||||
|
|
||||||
def _is_content_page(self) -> bool:
|
def _is_content_page(self) -> bool:
|
||||||
if link := self.get_permalink():
|
if link := self.get_permalink():
|
||||||
return "target=copa_" in link
|
return "/copa/" in link
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _is_learning_module_page(self) -> bool:
|
def _is_learning_module_page(self) -> bool:
|
||||||
@ -513,19 +706,17 @@ class IliasPage:
|
|||||||
# Configure button/link does not have anything interesting
|
# Configure button/link does not have anything interesting
|
||||||
continue
|
continue
|
||||||
|
|
||||||
type = self._find_type_from_link(name, link, url)
|
typ = IliasPage._find_type_for_element(
|
||||||
if not type:
|
name, url, lambda: IliasPage._find_icon_for_folder_entry(link)
|
||||||
|
)
|
||||||
|
if not typ:
|
||||||
_unexpected_html_warning()
|
_unexpected_html_warning()
|
||||||
log.warn_contd(f"Could not extract type for {link}")
|
log.warn_contd(f"Could not extract type for {link}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
log.explain(f"Found {name!r}")
|
log.explain(f"Found {name!r} of type {typ}")
|
||||||
|
|
||||||
if type == IliasElementType.FILE and "_download" not in url:
|
items.append(IliasPageElement.create_new(typ, url, name))
|
||||||
url = re.sub(r"(target=file_\d+)", r"\1_download", url)
|
|
||||||
log.explain("Rewired file URL to include download part")
|
|
||||||
|
|
||||||
items.append(IliasPageElement.create_new(type, url, name))
|
|
||||||
|
|
||||||
return items
|
return items
|
||||||
|
|
||||||
@ -786,15 +977,17 @@ class IliasPage:
|
|||||||
for link in links:
|
for link in links:
|
||||||
abs_url = self._abs_url_from_link(link)
|
abs_url = self._abs_url_from_link(link)
|
||||||
# Make sure parents are sanitized. We do not want accidental parents
|
# Make sure parents are sanitized. We do not want accidental parents
|
||||||
parents = [_sanitize_path_name(x) for x in self._find_upwards_folder_hierarchy(link)]
|
parents = [_sanitize_path_name(x) for x in IliasPage._find_upwards_folder_hierarchy(link)]
|
||||||
|
|
||||||
if parents:
|
if parents:
|
||||||
element_name = "/".join(parents) + "/" + _sanitize_path_name(link.get_text())
|
element_name = "/".join(parents) + "/" + _sanitize_path_name(link.get_text())
|
||||||
else:
|
else:
|
||||||
element_name = _sanitize_path_name(link.get_text())
|
element_name = _sanitize_path_name(link.get_text())
|
||||||
|
|
||||||
element_type = self._find_type_from_link(element_name, link, abs_url)
|
element_type = IliasPage._find_type_for_element(
|
||||||
description = self._find_link_description(link)
|
element_name, abs_url, lambda: IliasPage._find_icon_for_folder_entry(link)
|
||||||
|
)
|
||||||
|
description = IliasPage._find_link_description(link)
|
||||||
|
|
||||||
# The last meeting on every page is expanded by default.
|
# The last meeting on every page is expanded by default.
|
||||||
# Its content is then shown inline *and* in the meeting page itself.
|
# Its content is then shown inline *and* in the meeting page itself.
|
||||||
@ -805,10 +998,10 @@ class IliasPage:
|
|||||||
if not element_type:
|
if not element_type:
|
||||||
continue
|
continue
|
||||||
elif element_type == IliasElementType.FILE:
|
elif element_type == IliasElementType.FILE:
|
||||||
result.append(self._file_to_element(element_name, abs_url, link))
|
result.append(IliasPage._file_to_element(element_name, abs_url, link))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
log.explain(f"Found {element_name!r}")
|
log.explain(f"Found {element_name!r} of type {element_type}")
|
||||||
result.append(IliasPageElement.create_new(
|
result.append(IliasPageElement.create_new(
|
||||||
element_type,
|
element_type,
|
||||||
abs_url,
|
abs_url,
|
||||||
@ -826,24 +1019,36 @@ class IliasPage:
|
|||||||
def _find_mediacast_videos(self) -> list[IliasPageElement]:
|
def _find_mediacast_videos(self) -> list[IliasPageElement]:
|
||||||
videos: list[IliasPageElement] = []
|
videos: list[IliasPageElement] = []
|
||||||
|
|
||||||
for elem in cast(list[Tag], self._soup.select(".ilPlayerPreviewOverlayOuter")):
|
regex = re.compile(r"il\.VideoPlaylist\.init.+?\[(.+?)], ")
|
||||||
element_name = _sanitize_path_name(
|
for script in cast(list[Tag], self._soup.find_all("script")):
|
||||||
cast(Tag, elem.select_one(".ilPlayerPreviewDescription")).get_text().strip()
|
for match in regex.finditer(script.text):
|
||||||
)
|
try:
|
||||||
if not element_name.endswith(".mp4"):
|
playlist = json.loads("[" + match.group(1) + "]")
|
||||||
# just to make sure it has some kinda-alrightish ending
|
except json.JSONDecodeError:
|
||||||
element_name = element_name + ".mp4"
|
log.warn("Could not decode playlist json")
|
||||||
video_element = cast(Optional[Tag], elem.find(name="video"))
|
log.warn_contd(f"Playlist json: [{match.group(1)}]")
|
||||||
if not video_element:
|
|
||||||
_unexpected_html_warning()
|
|
||||||
log.warn_contd(f"No <video> element found for mediacast video '{element_name}'")
|
|
||||||
continue
|
continue
|
||||||
|
for elem in playlist:
|
||||||
|
title = elem.get("title", None)
|
||||||
|
description = elem.get("description", None)
|
||||||
|
url = elem.get("resource", None)
|
||||||
|
if title is None or description is None or url is None:
|
||||||
|
log.explain(f"Mediacast json: {match.group(1)}")
|
||||||
|
log.warn("Mediacast video json was not complete")
|
||||||
|
if title is None:
|
||||||
|
log.warn_contd("Missing title")
|
||||||
|
if description is None:
|
||||||
|
log.warn_contd("Missing description")
|
||||||
|
if url is None:
|
||||||
|
log.warn_contd("Missing URL")
|
||||||
|
|
||||||
|
if not title.endswith(".mp4") and not title.endswith(".webm"):
|
||||||
|
# just to make sure it has some kinda-alrightish ending
|
||||||
|
title = title + ".mp4"
|
||||||
videos.append(IliasPageElement.create_new(
|
videos.append(IliasPageElement.create_new(
|
||||||
typ=IliasElementType.MEDIACAST_VIDEO,
|
typ=IliasElementType.MEDIACAST_VIDEO,
|
||||||
url=self._abs_url_from_relative(cast(str, video_element.get("src"))),
|
url=self._abs_url_from_relative(cast(str, url)),
|
||||||
name=element_name,
|
name=_sanitize_path_name(title)
|
||||||
mtime=self._find_mediacast_video_mtime(cast(Tag, elem.find_parent(name="td")))
|
|
||||||
))
|
))
|
||||||
|
|
||||||
return videos
|
return videos
|
||||||
@ -851,46 +1056,55 @@ class IliasPage:
|
|||||||
def _find_mob_videos(self) -> list[IliasPageElement]:
|
def _find_mob_videos(self) -> list[IliasPageElement]:
|
||||||
videos: list[IliasPageElement] = []
|
videos: list[IliasPageElement] = []
|
||||||
|
|
||||||
for figure in self._soup.select("figure.ilc_media_cont_MediaContainerHighlighted"):
|
selector = "figure.ilc_media_cont_MediaContainerHighlighted,figure.ilc_media_cont_MediaContainer"
|
||||||
title = cast(Tag, figure.select_one("figcaption")).get_text().strip() + ".mp4"
|
for figure in self._soup.select(selector):
|
||||||
video_element = figure.select_one("video")
|
video_element = figure.select_one("video")
|
||||||
if not video_element:
|
if not video_element:
|
||||||
_unexpected_html_warning()
|
|
||||||
log.warn_contd(f"No <video> element found for mob video '{title}'")
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
url = None
|
url, title = self._find_mob_video_url_title(video_element, figure)
|
||||||
for source in video_element.select("source"):
|
|
||||||
if source.get("type", "") == "video/mp4":
|
|
||||||
url = cast(Optional[str], source.get("src"))
|
|
||||||
break
|
|
||||||
|
|
||||||
if url is None:
|
if url is None:
|
||||||
_unexpected_html_warning()
|
_unexpected_html_warning()
|
||||||
log.warn_contd(f"No <source> element found for mob video '{title}'")
|
log.warn_contd(f"No <source> element found for mob video '{title}'")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if urlparse(url).hostname != urlparse(self._page_url).hostname:
|
||||||
|
log.explain(f"Found external video at {url}, ignoring")
|
||||||
|
continue
|
||||||
|
|
||||||
videos.append(IliasPageElement.create_new(
|
videos.append(IliasPageElement.create_new(
|
||||||
typ=IliasElementType.MOB_VIDEO,
|
typ=IliasElementType.MOB_VIDEO,
|
||||||
url=self._abs_url_from_relative(url),
|
url=url,
|
||||||
name=_sanitize_path_name(title),
|
name=_sanitize_path_name(title),
|
||||||
mtime=None
|
mtime=None
|
||||||
))
|
))
|
||||||
|
|
||||||
return videos
|
return videos
|
||||||
|
|
||||||
def _find_mediacast_video_mtime(self, enclosing_td: Tag) -> Optional[datetime]:
|
def _find_mob_video_url_title(self, video_element: Tag, figure: Tag) -> tuple[Optional[str], str]:
|
||||||
description_td = cast(Tag, enclosing_td.find_previous_sibling("td"))
|
url = None
|
||||||
if not description_td:
|
for source in video_element.select("source"):
|
||||||
return None
|
if source.get("type", "") == "video/mp4":
|
||||||
|
url = cast(Optional[str], source.get("src"))
|
||||||
|
break
|
||||||
|
|
||||||
meta_tag = cast(Optional[Tag], description_td.find_all("p")[-1])
|
if url is None and video_element.get("src"):
|
||||||
if not meta_tag:
|
url = cast(Optional[str], video_element.get("src"))
|
||||||
return None
|
|
||||||
|
|
||||||
updated_str = meta_tag.get_text().strip().replace("\n", " ")
|
fig_caption = cast(Optional[Tag], figure.select_one("figcaption"))
|
||||||
updated_str = re.sub(".+?: ", "", updated_str)
|
if fig_caption:
|
||||||
return demangle_date(updated_str)
|
title = cast(Tag, figure.select_one("figcaption")).get_text().strip() + ".mp4"
|
||||||
|
elif url is not None:
|
||||||
|
path = urlparse(self._abs_url_from_relative(url)).path
|
||||||
|
title = path.rsplit("/", 1)[-1]
|
||||||
|
else:
|
||||||
|
title = f"unknown video {figure}"
|
||||||
|
|
||||||
|
if url:
|
||||||
|
url = self._abs_url_from_relative(url)
|
||||||
|
|
||||||
|
return url, title
|
||||||
|
|
||||||
def _is_in_expanded_meeting(self, tag: Tag) -> bool:
|
def _is_in_expanded_meeting(self, tag: Tag) -> bool:
|
||||||
"""
|
"""
|
||||||
@ -907,12 +1121,17 @@ class IliasPage:
|
|||||||
# We should not crawl files under meetings
|
# We should not crawl files under meetings
|
||||||
if "ilContainerListItemContentCB" in cast(str, parent.get("class")):
|
if "ilContainerListItemContentCB" in cast(str, parent.get("class")):
|
||||||
link: Tag = parent.parent.find("a") # type: ignore
|
link: Tag = parent.parent.find("a") # type: ignore
|
||||||
type = IliasPage._find_type_from_folder_like(link, self._page_url)
|
typ = IliasPage._find_type_for_element(
|
||||||
return type == IliasElementType.MEETING
|
"meeting",
|
||||||
|
self._abs_url_from_link(link),
|
||||||
|
lambda: IliasPage._find_icon_for_folder_entry(link)
|
||||||
|
)
|
||||||
|
return typ == IliasElementType.MEETING
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _find_upwards_folder_hierarchy(self, tag: Tag) -> list[str]:
|
@staticmethod
|
||||||
|
def _find_upwards_folder_hierarchy(tag: Tag) -> list[str]:
|
||||||
"""
|
"""
|
||||||
Interprets accordions and expandable blocks as virtual folders and returns them
|
Interprets accordions and expandable blocks as virtual folders and returns them
|
||||||
in order. This allows us to find a file named "Test" in an accordion "Acc" as "Acc/Test"
|
in order. This allows us to find a file named "Test" in an accordion "Acc" as "Acc/Test"
|
||||||
@ -953,13 +1172,16 @@ class IliasPage:
|
|||||||
if outer_accordion_content:
|
if outer_accordion_content:
|
||||||
accordion_tag = cast(Tag, outer_accordion_content.parent)
|
accordion_tag = cast(Tag, outer_accordion_content.parent)
|
||||||
head_tag = cast(Tag, accordion_tag.find(attrs={
|
head_tag = cast(Tag, accordion_tag.find(attrs={
|
||||||
"class": lambda x: x is not None and "ilc_va_ihead_VAccordIHead" in x
|
"class": lambda x: x is not None and (
|
||||||
|
"ilc_va_ihead_VAccordIHead" in x or "ilc_va_ihead_AccordIHead" in x
|
||||||
|
)
|
||||||
}))
|
}))
|
||||||
found_titles.append(head_tag.get_text().strip())
|
found_titles.append(head_tag.get_text().strip())
|
||||||
|
|
||||||
return [_sanitize_path_name(x) for x in reversed(found_titles)]
|
return [_sanitize_path_name(x) for x in reversed(found_titles)]
|
||||||
|
|
||||||
def _find_link_description(self, link: Tag) -> Optional[str]:
|
@staticmethod
|
||||||
|
def _find_link_description(link: Tag) -> Optional[str]:
|
||||||
tile = cast(
|
tile = cast(
|
||||||
Tag,
|
Tag,
|
||||||
link.find_parent("div", {"class": lambda x: x is not None and "il_ContainerListItem" in x})
|
link.find_parent("div", {"class": lambda x: x is not None and "il_ContainerListItem" in x})
|
||||||
@ -974,7 +1196,8 @@ class IliasPage:
|
|||||||
return None
|
return None
|
||||||
return description_element.get_text().strip()
|
return description_element.get_text().strip()
|
||||||
|
|
||||||
def _file_to_element(self, name: str, url: str, link_element: Tag) -> IliasPageElement:
|
@staticmethod
|
||||||
|
def _file_to_element(name: str, url: str, link_element: Tag) -> IliasPageElement:
|
||||||
# Files have a list of properties (type, modification date, size, etc.)
|
# Files have a list of properties (type, modification date, size, etc.)
|
||||||
# In a series of divs.
|
# In a series of divs.
|
||||||
# Find the parent containing all those divs, so we can filter our what we need
|
# Find the parent containing all those divs, so we can filter our what we need
|
||||||
@ -1007,27 +1230,38 @@ class IliasPage:
|
|||||||
for title in card_titles:
|
for title in card_titles:
|
||||||
url = self._abs_url_from_link(title)
|
url = self._abs_url_from_link(title)
|
||||||
name = _sanitize_path_name(title.get_text().strip())
|
name = _sanitize_path_name(title.get_text().strip())
|
||||||
type = self._find_type_from_card(title)
|
typ = IliasPage._find_type_for_element(
|
||||||
|
name, url, lambda: IliasPage._find_icon_from_card(title)
|
||||||
|
)
|
||||||
|
|
||||||
if not type:
|
if not typ:
|
||||||
_unexpected_html_warning()
|
_unexpected_html_warning()
|
||||||
log.warn_contd(f"Could not extract type for {title}")
|
log.warn_contd(f"Could not extract type for {title}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
result.append(IliasPageElement.create_new(type, url, name))
|
result.append(IliasPageElement.create_new(typ, url, name))
|
||||||
|
|
||||||
card_button_tiles: list[Tag] = self._soup.select(".card-title button")
|
card_button_tiles: list[Tag] = self._soup.select(".card-title button")
|
||||||
|
|
||||||
for button in card_button_tiles:
|
for button in card_button_tiles:
|
||||||
regex = re.compile(button["id"] + r".*window.open\(['\"](.+?)['\"]") # type: ignore
|
signal_regex = re.compile("#" + str(button["id"]) + r"[\s\S]*?\.trigger\('(.+?)'")
|
||||||
res = regex.search(str(self._soup))
|
signal_match = signal_regex.search(str(self._soup))
|
||||||
if not res:
|
if not signal_match:
|
||||||
_unexpected_html_warning()
|
_unexpected_html_warning()
|
||||||
log.warn_contd(f"Could not find click handler target for {button}")
|
log.warn_contd(f"Could not find click handler signal for {button}")
|
||||||
continue
|
continue
|
||||||
url = self._abs_url_from_relative(res.group(1))
|
signal = signal_match.group(1)
|
||||||
|
open_regex = re.compile(r"\.on\('" + signal + r"[\s\S]*?window.open\(['\"](.+?)['\"]")
|
||||||
|
open_match = open_regex.search(str(self._soup))
|
||||||
|
if not open_match:
|
||||||
|
_unexpected_html_warning()
|
||||||
|
log.warn_contd(f"Could not find click handler target for signal {signal} for {button}")
|
||||||
|
continue
|
||||||
|
url = self._abs_url_from_relative(open_match.group(1))
|
||||||
name = _sanitize_path_name(button.get_text().strip())
|
name = _sanitize_path_name(button.get_text().strip())
|
||||||
type = self._find_type_from_card(button)
|
typ = IliasPage._find_type_for_element(
|
||||||
|
name, url, lambda: IliasPage._find_icon_from_card(button)
|
||||||
|
)
|
||||||
caption_parent = cast(Tag, button.find_parent(
|
caption_parent = cast(Tag, button.find_parent(
|
||||||
"div",
|
"div",
|
||||||
attrs={"class": lambda x: x is not None and "caption" in x},
|
attrs={"class": lambda x: x is not None and "caption" in x},
|
||||||
@ -1038,143 +1272,59 @@ class IliasPage:
|
|||||||
else:
|
else:
|
||||||
description = None
|
description = None
|
||||||
|
|
||||||
if not type:
|
if not typ:
|
||||||
_unexpected_html_warning()
|
_unexpected_html_warning()
|
||||||
log.warn_contd(f"Could not extract type for {button}")
|
log.warn_contd(f"Could not extract type for {button}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
result.append(IliasPageElement.create_new(type, url, name, description=description))
|
result.append(IliasPageElement.create_new(typ, url, name, description=description))
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def _find_type_from_card(self, card_title: Tag) -> Optional[IliasElementType]:
|
|
||||||
def is_card_root(element: Tag) -> bool:
|
|
||||||
return "il-card" in element["class"] and "thumbnail" in element["class"]
|
|
||||||
|
|
||||||
card_root: Optional[Tag] = None
|
|
||||||
|
|
||||||
# We look for the card root
|
|
||||||
for parent in card_title.parents:
|
|
||||||
if is_card_root(parent):
|
|
||||||
card_root = parent
|
|
||||||
break
|
|
||||||
|
|
||||||
if card_root is None:
|
|
||||||
_unexpected_html_warning()
|
|
||||||
log.warn_contd(f"Tried to figure out element type, but did not find an icon for {card_title}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
icon = cast(Tag, card_root.select_one(".il-card-repository-head .icon"))
|
|
||||||
|
|
||||||
if "opencast" in icon["class"] or "xoct" in icon["class"]:
|
|
||||||
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
|
|
||||||
if "exc" in icon["class"]:
|
|
||||||
return IliasElementType.EXERCISE
|
|
||||||
if "grp" in icon["class"]:
|
|
||||||
return IliasElementType.FOLDER
|
|
||||||
if "webr" in icon["class"]:
|
|
||||||
return IliasElementType.LINK
|
|
||||||
if "book" in icon["class"]:
|
|
||||||
return IliasElementType.BOOKING
|
|
||||||
if "crsr" in icon["class"]:
|
|
||||||
return IliasElementType.COURSE
|
|
||||||
if "frm" in icon["class"]:
|
|
||||||
return IliasElementType.FORUM
|
|
||||||
if "sess" in icon["class"]:
|
|
||||||
return IliasElementType.MEETING
|
|
||||||
if "tst" in icon["class"]:
|
|
||||||
return IliasElementType.TEST
|
|
||||||
if "fold" in icon["class"]:
|
|
||||||
return IliasElementType.FOLDER
|
|
||||||
if "copa" in icon["class"]:
|
|
||||||
return IliasElementType.FOLDER
|
|
||||||
if "svy" in icon["class"]:
|
|
||||||
return IliasElementType.SURVEY
|
|
||||||
if "file" in icon["class"]:
|
|
||||||
return IliasElementType.FILE
|
|
||||||
if "mcst" in icon["class"]:
|
|
||||||
return IliasElementType.MEDIACAST_VIDEO_FOLDER
|
|
||||||
|
|
||||||
_unexpected_html_warning()
|
|
||||||
log.warn_contd(f"Could not extract type from {icon} for card title {card_title}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _find_type_from_link(
|
def _find_type_for_element(
|
||||||
element_name: str,
|
element_name: str,
|
||||||
link_element: Tag,
|
url: str,
|
||||||
url: str
|
icon_for_element: Callable[[], Optional[Tag]],
|
||||||
) -> Optional[IliasElementType]:
|
) -> Optional[IliasElementType]:
|
||||||
"""
|
"""
|
||||||
Decides which sub crawler to use for a given top level element.
|
Decides which sub crawler to use for a given top level element.
|
||||||
"""
|
"""
|
||||||
parsed_url = urlparse(url)
|
parsed_url = urlparse(url)
|
||||||
|
icon = icon_for_element()
|
||||||
|
|
||||||
# file URLs contain "target=file"
|
def try_matcher(matcher: IliasElementMatcher) -> bool:
|
||||||
if "target=file_" in parsed_url.query:
|
match matcher:
|
||||||
return IliasElementType.FILE
|
case TypeMatcher.All(matchers=ms):
|
||||||
|
return all(try_matcher(m) for m in ms)
|
||||||
|
case TypeMatcher.Any(matchers=ms):
|
||||||
|
return any(try_matcher(m) for m in ms)
|
||||||
|
case TypeMatcher.ImgAlt(alt=alt):
|
||||||
|
return icon is not None and alt in str(icon["alt"]).lower()
|
||||||
|
case TypeMatcher.ImgSrc(src=src):
|
||||||
|
return icon is not None and src in str(icon["src"]).lower()
|
||||||
|
case TypeMatcher.UrlPath(path=path):
|
||||||
|
return path in parsed_url.path.lower()
|
||||||
|
case TypeMatcher.UrlParameter(query=query):
|
||||||
|
return query in parsed_url.query.lower()
|
||||||
|
|
||||||
if "target=grp_" in parsed_url.query:
|
raise CrawlError(f"Unknown matcher {matcher}")
|
||||||
return IliasElementType.FOLDER
|
|
||||||
|
|
||||||
if "target=crs_" in parsed_url.query:
|
for typ in IliasElementType:
|
||||||
return IliasElementType.FOLDER
|
if try_matcher(typ.matcher()):
|
||||||
|
return typ
|
||||||
if "baseClass=ilExerciseHandlerGUI" in parsed_url.query:
|
|
||||||
return IliasElementType.EXERCISE
|
|
||||||
|
|
||||||
if "baseClass=ilLinkResourceHandlerGUI" in parsed_url.query and "calldirectlink" in parsed_url.query:
|
|
||||||
return IliasElementType.LINK
|
|
||||||
|
|
||||||
if "cmd=showThreads" in parsed_url.query or "target=frm_" in parsed_url.query:
|
|
||||||
return IliasElementType.FORUM
|
|
||||||
|
|
||||||
if "cmdClass=ilobjtestgui" in parsed_url.query:
|
|
||||||
return IliasElementType.TEST
|
|
||||||
|
|
||||||
if "baseClass=ilLMPresentationGUI" in parsed_url.query:
|
|
||||||
return IliasElementType.LEARNING_MODULE
|
|
||||||
|
|
||||||
if "baseClass=ilMediaCastHandlerGUI" in parsed_url.query:
|
|
||||||
return IliasElementType.MEDIACAST_VIDEO_FOLDER
|
|
||||||
|
|
||||||
if "baseClass=ilSAHSPresentationGUI" in parsed_url.query:
|
|
||||||
return IliasElementType.SCORM_LEARNING_MODULE
|
|
||||||
|
|
||||||
# other universities might have content type specified in URL path
|
|
||||||
if "_file_" in parsed_url.path:
|
|
||||||
return IliasElementType.FILE
|
|
||||||
|
|
||||||
if "_fold_" in parsed_url.path or "_copa_" in parsed_url.path:
|
|
||||||
return IliasElementType.FOLDER
|
|
||||||
|
|
||||||
if "_frm_" in parsed_url.path:
|
|
||||||
return IliasElementType.FORUM
|
|
||||||
|
|
||||||
if "_exc_" in parsed_url.path:
|
|
||||||
return IliasElementType.EXERCISE
|
|
||||||
|
|
||||||
# Booking and Meeting can not be detected based on the link. They do have a ref_id though, so
|
|
||||||
# try to guess it from the image.
|
|
||||||
|
|
||||||
# Everything with a ref_id can *probably* be opened to reveal nested things
|
|
||||||
# video groups, directories, exercises, etc
|
|
||||||
if "ref_id=" in parsed_url.query or "goto.php" in parsed_url.path:
|
|
||||||
return IliasPage._find_type_from_folder_like(link_element, url)
|
|
||||||
|
|
||||||
_unexpected_html_warning()
|
_unexpected_html_warning()
|
||||||
log.warn_contd(
|
log.warn_contd(f"Tried to figure out element type, but failed for {element_name!r} / {url!r})")
|
||||||
f"Tried to figure out element type, but failed for {element_name!r} / {link_element!r})"
|
|
||||||
)
|
if "ref_id=" in parsed_url.query.lower() or "goto.php" in parsed_url.path.lower():
|
||||||
|
log.warn_contd("Defaulting to FOLDER as it contains a ref_id/goto")
|
||||||
|
return IliasElementType.FOLDER
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _find_type_from_folder_like(link_element: Tag, url: str) -> Optional[IliasElementType]:
|
def _find_icon_for_folder_entry(link_element: Tag) -> Optional[Tag]:
|
||||||
"""
|
|
||||||
Try crawling something that looks like a folder.
|
|
||||||
"""
|
|
||||||
# pylint: disable=too-many-return-statements
|
|
||||||
|
|
||||||
found_parent: Optional[Tag] = None
|
found_parent: Optional[Tag] = None
|
||||||
|
|
||||||
# We look for the outer div of our inner link, to find information around it
|
# We look for the outer div of our inner link, to find information around it
|
||||||
@ -1186,7 +1336,9 @@ class IliasPage:
|
|||||||
|
|
||||||
if found_parent is None:
|
if found_parent is None:
|
||||||
_unexpected_html_warning()
|
_unexpected_html_warning()
|
||||||
log.warn_contd(f"Tried to figure out element type, but did not find an icon for {url}")
|
log.warn_contd(
|
||||||
|
f"Tried to figure out element type, but did not find an icon for {link_element!r}"
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Find the small descriptive icon to figure out the type
|
# Find the small descriptive icon to figure out the type
|
||||||
@ -1203,42 +1355,35 @@ class IliasPage:
|
|||||||
log.explain("Found session expansion button, skipping it as it has no content")
|
log.explain("Found session expansion button, skipping it as it has no content")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if img_tag is None:
|
if img_tag is not None:
|
||||||
_unexpected_html_warning()
|
return img_tag
|
||||||
log.warn_contd(f"Tried to figure out element type, but did not find an image for {url}")
|
|
||||||
|
log.explain(f"Tried to figure out element type, but did not find an image for {link_element!r}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if "opencast" in str(img_tag["alt"]).lower():
|
@staticmethod
|
||||||
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
|
def _find_icon_from_card(card_title: Tag) -> Optional[Tag]:
|
||||||
|
def is_card_root(element: Tag) -> bool:
|
||||||
|
return "il-card" in element["class"] and "thumbnail" in element["class"]
|
||||||
|
|
||||||
if str(img_tag["src"]).endswith("icon_exc.svg"):
|
card_root: Optional[Tag] = None
|
||||||
return IliasElementType.EXERCISE
|
|
||||||
|
|
||||||
if str(img_tag["src"]).endswith("icon_webr.svg"):
|
# We look for the card root
|
||||||
return IliasElementType.LINK
|
for parent in card_title.parents:
|
||||||
|
if is_card_root(parent):
|
||||||
|
card_root = parent
|
||||||
|
break
|
||||||
|
|
||||||
if str(img_tag["src"]).endswith("icon_book.svg"):
|
if card_root is None:
|
||||||
return IliasElementType.BOOKING
|
_unexpected_html_warning()
|
||||||
|
log.warn_contd(f"Tried to figure out element type, but did not find an icon for {card_title}")
|
||||||
|
return None
|
||||||
|
|
||||||
if str(img_tag["src"]).endswith("frm.svg"):
|
return cast(Tag, card_root.select_one(".il-card-repository-head .icon"))
|
||||||
return IliasElementType.FORUM
|
|
||||||
|
|
||||||
if str(img_tag["src"]).endswith("sess.svg"):
|
|
||||||
return IliasElementType.MEETING
|
|
||||||
|
|
||||||
if str(img_tag["src"]).endswith("icon_tst.svg"):
|
|
||||||
return IliasElementType.TEST
|
|
||||||
|
|
||||||
if str(img_tag["src"]).endswith("icon_mcst.svg"):
|
|
||||||
return IliasElementType.MEDIACAST_VIDEO_FOLDER
|
|
||||||
|
|
||||||
if str(img_tag["src"]).endswith("icon_sahs.svg"):
|
|
||||||
return IliasElementType.SCORM_LEARNING_MODULE
|
|
||||||
|
|
||||||
return IliasElementType.FOLDER
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def is_logged_in(soup: BeautifulSoup) -> bool:
|
def is_logged_in(ilias_soup: IliasSoup) -> bool:
|
||||||
|
soup = ilias_soup.soup
|
||||||
# Normal ILIAS pages
|
# Normal ILIAS pages
|
||||||
mainbar = cast(Optional[Tag], soup.find(class_="il-maincontrols-metabar"))
|
mainbar = cast(Optional[Tag], soup.find(class_="il-maincontrols-metabar"))
|
||||||
if mainbar is not None:
|
if mainbar is not None:
|
||||||
@ -1285,7 +1430,7 @@ class IliasPage:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def get_permalink(self) -> Optional[str]:
|
def get_permalink(self) -> Optional[str]:
|
||||||
return IliasPage.get_soup_permalink(self._soup)
|
return IliasPage.get_soup_permalink(self._ilias_soup)
|
||||||
|
|
||||||
def _abs_url_from_link(self, link_tag: Tag) -> str:
|
def _abs_url_from_link(self, link_tag: Tag) -> str:
|
||||||
"""
|
"""
|
||||||
@ -1300,11 +1445,15 @@ class IliasPage:
|
|||||||
return urljoin(self._page_url, relative_url)
|
return urljoin(self._page_url, relative_url)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_soup_permalink(soup: BeautifulSoup) -> Optional[str]:
|
def get_soup_permalink(ilias_soup: IliasSoup) -> Optional[str]:
|
||||||
perma_link_element = cast(Tag, soup.select_one(".il-footer-permanent-url > a"))
|
scripts = cast(list[Tag], ilias_soup.soup.find_all("script"))
|
||||||
if not perma_link_element or not perma_link_element.get("href"):
|
pattern = re.compile(r"il\.Footer\.permalink\.copyText\(\"(.+?)\"\)")
|
||||||
|
for script in scripts:
|
||||||
|
if match := pattern.search(script.text):
|
||||||
|
url = match.group(1)
|
||||||
|
url = url.replace(r"\/", "/")
|
||||||
|
return url
|
||||||
return None
|
return None
|
||||||
return cast(Optional[str], perma_link_element.get("href"))
|
|
||||||
|
|
||||||
|
|
||||||
def _unexpected_html_warning() -> None:
|
def _unexpected_html_warning() -> None:
|
||||||
|
Reference in New Issue
Block a user