Reformat and switch to ruff

This commit is contained in:
I-Al-Istannen
2025-10-19 15:19:43 +02:00
parent ee4625be78
commit 2cf0e060ed
31 changed files with 1507 additions and 587 deletions

View File

@@ -1,5 +1,9 @@
from .kit_ilias_web_crawler import (IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler,
KitIliasWebCrawlerSection)
from .kit_ilias_web_crawler import (
IliasWebCrawler,
IliasWebCrawlerSection,
KitIliasWebCrawler,
KitIliasWebCrawlerSection,
)
__all__ = [
"IliasWebCrawler",

View File

@@ -254,8 +254,8 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
)
if bot_nav := body.select_one(".ilc_page_bnav_BottomNavigation"):
bot_nav.replace_with(soupify(nav_template.replace(
"{{left}}", left).replace("{{right}}", right).encode())
bot_nav.replace_with(
soupify(nav_template.replace("{{left}}", left).replace("{{right}}", right).encode())
)
body_str = cast(str, body.prettify())
@@ -265,10 +265,11 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
def forum_thread_template(name: str, url: str, heading: bs4.Tag, content: bs4.Tag) -> str:
if title := cast(Optional[bs4.Tag], heading.find(name="b")):
title.wrap(bs4.Tag(name="a", attrs={"href": url}))
return _forum_thread_template \
.replace("{{name}}", name) \
.replace("{{heading}}", cast(str, heading.prettify())) \
return (
_forum_thread_template.replace("{{name}}", name)
.replace("{{heading}}", cast(str, heading.prettify()))
.replace("{{content}}", cast(str, content.prettify()))
)
@dataclasses.dataclass
@@ -330,8 +331,7 @@ class Links(Enum):
# All others get coerced to fancy
content = cast(str, Links.FANCY.template())
repeated_content = cast(
re.Match[str],
re.search(r"<!-- REPEAT START -->([\s\S]+)<!-- REPEAT END -->", content)
re.Match[str], re.search(r"<!-- REPEAT START -->([\s\S]+)<!-- REPEAT END -->", content)
).group(1)
parts = []

View File

@@ -86,7 +86,7 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup:
for block in cast(list[Tag], soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap")):
block.name = "h3"
block["class"] += ["accordion-head"] # type: ignore
block["class"] += ["accordion-head"]
for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"):
children = list(dummy.children)

View File

@@ -21,8 +21,16 @@ from ..http_crawler import HttpCrawler, HttpCrawlerSection
from .async_helper import _iorepeat
from .file_templates import LinkData, Links, forum_thread_template, learning_module_template
from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
IliasPageElement, IliasSoup, _sanitize_path_name, parse_ilias_forum_export)
from .kit_ilias_html import (
IliasElementType,
IliasForumThread,
IliasLearningModulePage,
IliasPage,
IliasPageElement,
IliasSoup,
_sanitize_path_name,
parse_ilias_forum_export,
)
from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int]
@@ -55,9 +63,7 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
self.invalid_value("login_type", login_type, "Should be <shibboleth | local>")
def tfa_auth(
self, authenticators: Dict[str, Authenticator]
) -> Optional[Authenticator]:
def tfa_auth(self, authenticators: Dict[str, Authenticator]) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
@@ -166,17 +172,19 @@ class IliasWebCrawler(HttpCrawler):
name: str,
section: IliasWebCrawlerSection,
config: Config,
authenticators: Dict[str, Authenticator]
authenticators: Dict[str, Authenticator],
):
# Setting a main authenticator for cookie sharing
auth = section.auth(authenticators)
super().__init__(name, section, config, shared_auth=auth)
if section.tasks() > 1:
log.warn("""
log.warn(
"""
Please avoid using too many parallel requests as these are the KIT ILIAS
instance's greatest bottleneck.
""".strip())
""".strip()
)
self._auth = auth
self._base_url = section.base_url()
@@ -210,22 +218,19 @@ instance's greatest bottleneck.
# Start crawling at the given course
root_url = url_set_query_param(
urljoin(self._base_url + "/", "goto.php"),
"target", f"crs_{course_id}",
"target",
f"crs_{course_id}",
)
await self._crawl_url(root_url, expected_id=course_id)
async def _crawl_desktop(self) -> None:
await self._crawl_url(
urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show"),
crawl_nested_courses=True
urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show"), crawl_nested_courses=True
)
async def _crawl_url(
self,
url: str,
expected_id: Optional[int] = None,
crawl_nested_courses: bool = False
self, url: str, expected_id: Optional[int] = None, crawl_nested_courses: bool = False
) -> None:
if awaitable := await self._handle_ilias_page(
url, None, PurePath("."), expected_id, crawl_nested_courses
@@ -238,7 +243,7 @@ instance's greatest bottleneck.
current_element: Optional[IliasPageElement],
path: PurePath,
expected_course_id: Optional[int] = None,
crawl_nested_courses: bool = False
crawl_nested_courses: bool = False,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(path)
if not maybe_cl:
@@ -319,10 +324,7 @@ instance's greatest bottleneck.
# works correctly.
@anoncritical
async def _handle_ilias_element(
self,
parent_path: PurePath,
element: IliasPageElement,
crawl_nested_courses: bool = False
self, parent_path: PurePath, element: IliasPageElement, crawl_nested_courses: bool = False
) -> Optional[Coroutine[Any, Any, None]]:
# element.name might contain `/` if the crawler created nested elements,
# so we can not sanitize it here. We trust in the output dir to thwart worst-case
@@ -344,7 +346,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](enable with option 'videos')"
"[bright_black](enable with option 'videos')",
)
return None
@@ -356,7 +358,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](enable with option 'forums')"
"[bright_black](enable with option 'forums')",
)
return None
return await self._handle_forum(element, element_path)
@@ -365,7 +367,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](tests contain no relevant data)"
"[bright_black](tests contain no relevant data)",
)
return None
elif element.type == IliasElementType.SURVEY:
@@ -373,7 +375,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](surveys contain no relevant data)"
"[bright_black](surveys contain no relevant data)",
)
return None
elif element.type == IliasElementType.SCORM_LEARNING_MODULE:
@@ -381,7 +383,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](scorm learning modules are not supported)"
"[bright_black](scorm learning modules are not supported)",
)
return None
elif element.type == IliasElementType.LITERATURE_LIST:
@@ -389,7 +391,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](literature lists are not currently supported)"
"[bright_black](literature lists are not currently supported)",
)
return None
elif element.type == IliasElementType.LEARNING_MODULE_HTML:
@@ -397,7 +399,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](HTML learning modules are not supported)"
"[bright_black](HTML learning modules are not supported)",
)
return None
elif element.type == IliasElementType.BLOG:
@@ -405,7 +407,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](blogs are not currently supported)"
"[bright_black](blogs are not currently supported)",
)
return None
elif element.type == IliasElementType.DCL_RECORD_LIST:
@@ -413,7 +415,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](dcl record lists are not currently supported)"
"[bright_black](dcl record lists are not currently supported)",
)
return None
elif element.type == IliasElementType.MEDIA_POOL:
@@ -421,7 +423,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](media pools are not currently supported)"
"[bright_black](media pools are not currently supported)",
)
return None
elif element.type == IliasElementType.COURSE:
@@ -431,7 +433,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](not descending into linked course)"
"[bright_black](not descending into linked course)",
)
return None
elif element.type == IliasElementType.WIKI:
@@ -439,7 +441,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](wikis are not currently supported)"
"[bright_black](wikis are not currently supported)",
)
return None
elif element.type == IliasElementType.LEARNING_MODULE:
@@ -513,19 +515,15 @@ instance's greatest bottleneck.
@anoncritical
@_iorepeat(3, "resolving link")
async def _download_link(
self,
link_renderer: Links,
collection_name: str,
links: list[LinkData],
dl: DownloadToken
self, link_renderer: Links, collection_name: str, links: list[LinkData], dl: DownloadToken
) -> None:
async with dl as (bar, sink):
rendered = link_renderer.interpolate(self._link_file_redirect_delay, collection_name, links)
sink.file.write(rendered.encode("utf-8"))
sink.done()
async def _resolve_link_target(self, export_url: str) -> Union[BeautifulSoup, Literal['none']]:
async def impl() -> Optional[Union[BeautifulSoup, Literal['none']]]:
async def _resolve_link_target(self, export_url: str) -> Union[BeautifulSoup, Literal["none"]]:
async def impl() -> Optional[Union[BeautifulSoup, Literal["none"]]]:
async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers:
@@ -626,7 +624,7 @@ instance's greatest bottleneck.
if self.prev_report:
self.report.add_custom_value(
_get_video_cache_key(element),
self.prev_report.get_custom_value(_get_video_cache_key(element))
self.prev_report.get_custom_value(_get_video_cache_key(element)),
)
# A video might contain other videos, so let's "crawl" the video first
@@ -698,7 +696,7 @@ instance's greatest bottleneck.
def add_to_report(paths: list[str]) -> None:
self.report.add_custom_value(
_get_video_cache_key(element),
{"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))}
{"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))},
)
async with dl as (bar, sink):
@@ -752,11 +750,7 @@ instance's greatest bottleneck.
await self._stream_from_url(element, sink, bar, is_video)
async def _stream_from_url(
self,
element: IliasPageElement,
sink: FileSink,
bar: ProgressBar,
is_video: bool
self, element: IliasPageElement, sink: FileSink, bar: ProgressBar, is_video: bool
) -> None:
url = element.url
@@ -831,10 +825,10 @@ instance's greatest bottleneck.
log.warn("Could not extract forum export url")
return
export = await self._post(export_url, {
"format": "html",
"cmd[createExportFile]": ""
})
export = await self._post(
export_url,
{"format": "html", "cmd[createExportFile]": ""},
)
elements = parse_ilias_forum_export(soupify(export))
@@ -848,10 +842,7 @@ instance's greatest bottleneck.
@anoncritical
@_iorepeat(3, "saving forum thread")
async def _download_forum_thread(
self,
parent_path: PurePath,
thread: Union[IliasForumThread, IliasPageElement],
forum_url: str
self, parent_path: PurePath, thread: Union[IliasForumThread, IliasPageElement], forum_url: str
) -> None:
path = parent_path / (_sanitize_path_name(thread.name) + ".html")
maybe_dl = await self.download(path, mtime=thread.mtime)
@@ -860,10 +851,7 @@ instance's greatest bottleneck.
async with maybe_dl as (bar, sink):
rendered = forum_thread_template(
thread.name,
forum_url,
thread.name_tag,
await self.internalize_images(thread.content_tag)
thread.name, forum_url, thread.name_tag, await self.internalize_images(thread.content_tag)
)
sink.file.write(rendered.encode("utf-8"))
sink.done()
@@ -891,13 +879,13 @@ instance's greatest bottleneck.
soup = await self._get_page(element.url)
page = IliasPage(soup, element)
if next := page.get_learning_module_data():
elements.extend(await self._crawl_learning_module_direction(
cl.path, next.previous_url, "left", element
))
elements.extend(
await self._crawl_learning_module_direction(cl.path, next.previous_url, "left", element)
)
elements.append(next)
elements.extend(await self._crawl_learning_module_direction(
cl.path, next.next_url, "right", element
))
elements.extend(
await self._crawl_learning_module_direction(cl.path, next.next_url, "right", element)
)
# Reflect their natural ordering in the file names
for index, lm_element in enumerate(elements):
@@ -907,9 +895,9 @@ instance's greatest bottleneck.
for index, elem in enumerate(elements):
prev_url = elements[index - 1].title if index > 0 else None
next_url = elements[index + 1].title if index < len(elements) - 1 else None
tasks.append(asyncio.create_task(
self._download_learning_module_page(cl.path, elem, prev_url, next_url)
))
tasks.append(
asyncio.create_task(self._download_learning_module_page(cl.path, elem, prev_url, next_url))
)
# And execute them
await self.gather(tasks)
@@ -919,7 +907,7 @@ instance's greatest bottleneck.
path: PurePath,
start_url: Optional[str],
dir: Union[Literal["left"], Literal["right"]],
parent_element: IliasPageElement
parent_element: IliasPageElement,
) -> List[IliasLearningModulePage]:
elements: List[IliasLearningModulePage] = []
@@ -950,7 +938,7 @@ instance's greatest bottleneck.
parent_path: PurePath,
element: IliasLearningModulePage,
prev: Optional[str],
next: Optional[str]
next: Optional[str],
) -> None:
path = parent_path / (_sanitize_path_name(element.title) + ".html")
maybe_dl = await self.download(path)
@@ -1037,11 +1025,7 @@ instance's greatest bottleneck.
)
return soup
async def _post(
self,
url: str,
data: dict[str, Union[str, List[str]]]
) -> bytes:
async def _post(self, url: str, data: dict[str, Union[str, List[str]]]) -> bytes:
form_data = aiohttp.FormData()
for key, val in data.items():
form_data.add_field(key, val)
@@ -1090,8 +1074,8 @@ instance's greatest bottleneck.
username, password = await self._auth.credentials()
login_form_data = aiohttp.FormData()
login_form_data.add_field('login_form/input_3/input_4', username)
login_form_data.add_field('login_form/input_3/input_5', password)
login_form_data.add_field("login_form/input_3/input_4", username)
login_form_data.add_field("login_form/input_3/input_5", password)
# do the actual login
async with self.session.post(urljoin(self._base_url, login_url), data=login_form_data) as request:

View File

@@ -42,15 +42,15 @@ class TypeMatcher:
self.alt = alt
class All:
matchers: list['IliasElementMatcher']
matchers: list["IliasElementMatcher"]
def __init__(self, matchers: list['IliasElementMatcher']):
def __init__(self, matchers: list["IliasElementMatcher"]):
self.matchers = matchers
class Any:
matchers: list['IliasElementMatcher']
matchers: list["IliasElementMatcher"]
def __init__(self, matchers: list['IliasElementMatcher']):
def __init__(self, matchers: list["IliasElementMatcher"]):
self.matchers = matchers
@staticmethod
@@ -70,11 +70,11 @@ class TypeMatcher:
return TypeMatcher.ImgAlt(alt)
@staticmethod
def all(*matchers: 'IliasElementMatcher') -> All:
def all(*matchers: "IliasElementMatcher") -> All:
return TypeMatcher.All(list(matchers))
@staticmethod
def any(*matchers: 'IliasElementMatcher') -> Any:
def any(*matchers: "IliasElementMatcher") -> Any:
return TypeMatcher.Any(list(matchers))
@staticmethod
@@ -127,20 +127,14 @@ class IliasElementType(Enum):
def matcher(self) -> IliasElementMatcher:
match self:
case IliasElementType.BLOG:
return TypeMatcher.any(
TypeMatcher.img_src("_blog.svg")
)
return TypeMatcher.any(TypeMatcher.img_src("_blog.svg"))
case IliasElementType.BOOKING:
return TypeMatcher.any(
TypeMatcher.path("/book/"),
TypeMatcher.img_src("_book.svg")
)
return TypeMatcher.any(TypeMatcher.path("/book/"), TypeMatcher.img_src("_book.svg"))
case IliasElementType.COURSE:
return TypeMatcher.any(TypeMatcher.path("/crs/"), TypeMatcher.img_src("_crsr.svg"))
case IliasElementType.DCL_RECORD_LIST:
return TypeMatcher.any(
TypeMatcher.img_src("_dcl.svg"),
TypeMatcher.query("cmdclass=ildclrecordlistgui")
TypeMatcher.img_src("_dcl.svg"), TypeMatcher.query("cmdclass=ildclrecordlistgui")
)
case IliasElementType.EXERCISE:
return TypeMatcher.never()
@@ -162,14 +156,11 @@ class IliasElementType(Enum):
return TypeMatcher.any(
TypeMatcher.path("/fold/"),
TypeMatcher.img_src("_fold.svg"),
TypeMatcher.path("/grp/"),
TypeMatcher.img_src("_grp.svg"),
TypeMatcher.path("/copa/"),
TypeMatcher.path("_copa_"),
TypeMatcher.img_src("_copa.svg"),
# Not supported right now but warn users
# TypeMatcher.query("baseclass=ilmediapoolpresentationgui"),
# TypeMatcher.img_alt("medienpool"),
@@ -188,14 +179,10 @@ class IliasElementType(Enum):
case IliasElementType.LITERATURE_LIST:
return TypeMatcher.img_src("_bibl.svg")
case IliasElementType.LEARNING_MODULE:
return TypeMatcher.any(
TypeMatcher.path("/lm/"),
TypeMatcher.img_src("_lm.svg")
)
return TypeMatcher.any(TypeMatcher.path("/lm/"), TypeMatcher.img_src("_lm.svg"))
case IliasElementType.LEARNING_MODULE_HTML:
return TypeMatcher.any(
TypeMatcher.query("baseclass=ilhtlmpresentationgui"),
TypeMatcher.img_src("_htlm.svg")
TypeMatcher.query("baseclass=ilhtlmpresentationgui"), TypeMatcher.img_src("_htlm.svg")
)
case IliasElementType.LINK:
return TypeMatcher.any(
@@ -203,17 +190,16 @@ class IliasElementType(Enum):
TypeMatcher.query("baseclass=illinkresourcehandlergui"),
TypeMatcher.query("calldirectlink"),
),
TypeMatcher.img_src("_webr.svg") # duplicated :(
TypeMatcher.img_src("_webr.svg"), # duplicated :(
)
case IliasElementType.LINK_COLLECTION:
return TypeMatcher.any(
TypeMatcher.query("baseclass=illinkresourcehandlergui"),
TypeMatcher.img_src("_webr.svg") # duplicated :(
TypeMatcher.img_src("_webr.svg"), # duplicated :(
)
case IliasElementType.MEDIA_POOL:
return TypeMatcher.any(
TypeMatcher.query("baseclass=ilmediapoolpresentationgui"),
TypeMatcher.img_src("_mep.svg")
TypeMatcher.query("baseclass=ilmediapoolpresentationgui"), TypeMatcher.img_src("_mep.svg")
)
case IliasElementType.MEDIACAST_VIDEO:
return TypeMatcher.never()
@@ -221,12 +207,10 @@ class IliasElementType(Enum):
return TypeMatcher.any(
TypeMatcher.path("/mcst/"),
TypeMatcher.query("baseclass=ilmediacasthandlergui"),
TypeMatcher.img_src("_mcst.svg")
TypeMatcher.img_src("_mcst.svg"),
)
case IliasElementType.MEETING:
return TypeMatcher.any(
TypeMatcher.img_src("_sess.svg")
)
return TypeMatcher.any(TypeMatcher.img_src("_sess.svg"))
case IliasElementType.MOB_VIDEO:
return TypeMatcher.never()
case IliasElementType.OPENCAST_VIDEO:
@@ -239,24 +223,19 @@ class IliasElementType(Enum):
return TypeMatcher.never()
case IliasElementType.SCORM_LEARNING_MODULE:
return TypeMatcher.any(
TypeMatcher.query("baseclass=ilsahspresentationgui"),
TypeMatcher.img_src("_sahs.svg")
TypeMatcher.query("baseclass=ilsahspresentationgui"), TypeMatcher.img_src("_sahs.svg")
)
case IliasElementType.SURVEY:
return TypeMatcher.any(
TypeMatcher.path("/svy/"),
TypeMatcher.img_src("svy.svg")
)
return TypeMatcher.any(TypeMatcher.path("/svy/"), TypeMatcher.img_src("svy.svg"))
case IliasElementType.TEST:
return TypeMatcher.any(
TypeMatcher.query("cmdclass=ilobjtestgui"),
TypeMatcher.query("cmdclass=iltestscreengui"),
TypeMatcher.img_src("_tst.svg")
TypeMatcher.img_src("_tst.svg"),
)
case IliasElementType.WIKI:
return TypeMatcher.any(
TypeMatcher.query("baseClass=ilwikihandlergui"),
TypeMatcher.img_src("wiki.svg")
TypeMatcher.query("baseClass=ilwikihandlergui"), TypeMatcher.img_src("wiki.svg")
)
raise CrawlWarning(f"Unknown matcher {self}")
@@ -291,7 +270,7 @@ class IliasPageElement:
r"thr_pk=(?P<id>\d+)", # forums
r"ref_id=(?P<id>\d+)",
r"target=[a-z]+_(?P<id>\d+)",
r"mm_(?P<id>\d+)"
r"mm_(?P<id>\d+)",
]
for regex in regexes:
@@ -309,8 +288,8 @@ class IliasPageElement:
name: str,
mtime: Optional[datetime] = None,
description: Optional[str] = None,
skip_sanitize: bool = False
) -> 'IliasPageElement':
skip_sanitize: bool = False,
) -> "IliasPageElement":
if typ == IliasElementType.MEETING:
normalized = IliasPageElement._normalize_meeting_name(name)
log.explain(f"Normalized meeting name from {name!r} to {normalized!r}")
@@ -382,7 +361,6 @@ class IliasSoup:
class IliasPage:
def __init__(self, ilias_soup: IliasSoup, source_element: Optional[IliasPageElement]):
self._ilias_soup = ilias_soup
self._soup = ilias_soup.soup
@@ -422,23 +400,26 @@ class IliasPage:
return self._find_normal_entries()
def get_info_tab(self) -> Optional[IliasPageElement]:
tab: Optional[Tag] = cast(Optional[Tag], self._soup.find(
name="a",
attrs={"href": lambda x: x is not None and "cmdClass=ilinfoscreengui" in x}
))
tab: Optional[Tag] = cast(
Optional[Tag],
self._soup.find(
name="a", attrs={"href": lambda x: x is not None and "cmdClass=ilinfoscreengui" in x}
),
)
if tab is not None:
return IliasPageElement.create_new(
IliasElementType.INFO_TAB,
self._abs_url_from_link(tab),
"infos"
IliasElementType.INFO_TAB, self._abs_url_from_link(tab), "infos"
)
return None
def get_description(self) -> Optional[BeautifulSoup]:
def is_interesting_class(name: str | None) -> bool:
return name in [
"ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap",
"ilc_va_ihcap_AccordIHeadCap", "ilc_media_cont_MediaContainer"
"ilCOPageSection",
"ilc_Paragraph",
"ilc_va_ihcap_VAccordIHeadCap",
"ilc_va_ihcap_AccordIHeadCap",
"ilc_media_cont_MediaContainer",
]
paragraphs: list[Tag] = cast(list[Tag], self._soup.find_all(class_=is_interesting_class))
@@ -457,7 +438,7 @@ class IliasPage:
if video := p.select_one("video"):
url, title = self._find_mob_video_url_title(video, p)
raw_html += '<div style="min-width: 100px; min-height: 100px; border: 1px solid black;'
raw_html += 'display: flex; justify-content: center; align-items: center;'
raw_html += "display: flex; justify-content: center; align-items: center;"
raw_html += ' margin: 0.5rem;">'
if url is not None and urlparse(url).hostname != urlparse(self._page_url).hostname:
if url.startswith("//"):
@@ -486,7 +467,7 @@ class IliasPage:
title=title,
content=content,
next_url=self._find_learning_module_next(),
previous_url=self._find_learning_module_prev()
previous_url=self._find_learning_module_prev(),
)
def _find_learning_module_next(self) -> Optional[str]:
@@ -517,7 +498,7 @@ class IliasPage:
rtoken_form = cast(
Optional[Tag],
self._soup.find("form", attrs={"action": lambda x: x is not None and "rtoken=" in x})
self._soup.find("form", attrs={"action": lambda x: x is not None and "rtoken=" in x}),
)
if not rtoken_form:
log.explain("Found no rtoken anywhere")
@@ -557,9 +538,7 @@ class IliasPage:
return True
# Raw listing without ILIAS fluff
video_element_table = self._soup.find(
name="table", id=re.compile(r"tbl_xoct_.+")
)
video_element_table = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+"))
return video_element_table is not None
def _is_ilias_opencast_embedding(self) -> bool:
@@ -600,24 +579,28 @@ class IliasPage:
return self._uncollapse_future_meetings_url() is not None
def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]:
element = cast(Optional[Tag], self._soup.find(
"a",
attrs={"href": lambda x: x is not None and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)}
))
element = cast(
Optional[Tag],
self._soup.find(
"a",
attrs={
"href": lambda x: x is not None and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)
},
),
)
if not element:
return None
link = self._abs_url_from_link(element)
return IliasPageElement.create_new(IliasElementType.FOLDER, link, "show all meetings")
def _is_exercise_not_all_shown(self) -> bool:
return (self._page_type == IliasElementType.EXERCISE_OVERVIEW
and "mode=all" not in self._page_url.lower())
return (
self._page_type == IliasElementType.EXERCISE_OVERVIEW and "mode=all" not in self._page_url.lower()
)
def _show_all_exercises(self) -> Optional[IliasPageElement]:
return IliasPageElement.create_new(
IliasElementType.EXERCISE_OVERVIEW,
self._page_url + "&mode=all",
"show all exercises"
IliasElementType.EXERCISE_OVERVIEW, self._page_url + "&mode=all", "show all exercises"
)
def _is_content_tab_selected(self) -> bool:
@@ -631,10 +614,12 @@ class IliasPage:
return "baseClass=ilmembershipoverviewgui" in self._page_url
def _select_content_page_url(self) -> Optional[IliasPageElement]:
tab = cast(Optional[Tag], self._soup.find(
id="tab_view_content",
attrs={"class": lambda x: x is not None and "active" not in x}
))
tab = cast(
Optional[Tag],
self._soup.find(
id="tab_view_content", attrs={"class": lambda x: x is not None and "active" not in x}
),
)
# Already selected (or not found)
if not tab:
return None
@@ -654,9 +639,7 @@ class IliasPage:
# on the page, but defined in a JS object inside a script tag, passed to the player
# library.
# We do the impossible and RegEx the stream JSON object out of the page's HTML source
regex = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
)
regex = re.compile(r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE)
json_match = regex.search(str(self._soup))
if json_match is None:
@@ -687,10 +670,12 @@ class IliasPage:
def _get_show_max_forum_entries_per_page_url(
self, wanted_max: Optional[int] = None
) -> Optional[IliasPageElement]:
correct_link = cast(Optional[Tag], self._soup.find(
"a",
attrs={"href": lambda x: x is not None and "trows=800" in x and "cmd=showThreads" in x}
))
correct_link = cast(
Optional[Tag],
self._soup.find(
"a", attrs={"href": lambda x: x is not None and "trows=800" in x and "cmd=showThreads" in x}
),
)
if not correct_link:
return None
@@ -775,11 +760,11 @@ class IliasPage:
continue
if "cmd=sendfile" not in link["href"]:
continue
items.append(IliasPageElement.create_new(
IliasElementType.FILE,
self._abs_url_from_link(link),
_sanitize_path_name(link.get_text())
))
items.append(
IliasPageElement.create_new(
IliasElementType.FILE, self._abs_url_from_link(link), _sanitize_path_name(link.get_text())
)
)
return items
@@ -791,9 +776,9 @@ class IliasPage:
#
# We need to figure out where we are.
video_element_table = cast(Optional[Tag], self._soup.find(
name="table", id=re.compile(r"tbl_xoct_.+")
))
video_element_table = cast(
Optional[Tag], self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+"))
)
if video_element_table is None:
# We are in stage 1
@@ -829,8 +814,7 @@ class IliasPage:
table_id = id_match.group(1)
query_params = {f"tbl_xoct_{table_id}_trows": "800",
"cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
query_params = {f"tbl_xoct_{table_id}_trows": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
url = url_set_query_params(self._page_url, query_params)
log.explain("Disabled pagination, retrying folder as a new entry")
@@ -841,9 +825,9 @@ class IliasPage:
Crawls the "second stage" video page. This page contains the actual video urls.
"""
# Video start links are marked with an "Abspielen" link
video_links = cast(list[Tag], self._soup.find_all(
name="a", text=re.compile(r"\s*(Abspielen|Play)\s*")
))
video_links = cast(
list[Tag], self._soup.find_all(name="a", text=re.compile(r"\s*(Abspielen|Play)\s*"))
)
results: list[IliasPageElement] = []
@@ -857,12 +841,12 @@ class IliasPage:
# 6th or 7th child (1 indexed) is the modification time string. Try to find it
# by parsing backwards from the end and finding something that looks like a date
modification_time = None
row: Tag = link.parent.parent.parent # type: ignore
row: Tag = link.parent.parent.parent
column_count = len(row.select("td.std"))
for index in range(column_count, 0, -1):
modification_string = link.parent.parent.parent.select_one( # type: ignore
f"td.std:nth-child({index})"
).get_text().strip()
modification_string = (
link.parent.parent.parent.select_one(f"td.std:nth-child({index})").get_text().strip()
)
if match := re.search(r"\d+\.\d+.\d+ \d+:\d+", modification_string):
modification_time = datetime.strptime(match.group(0), "%d.%m.%Y %H:%M")
break
@@ -871,7 +855,7 @@ class IliasPage:
log.warn(f"Could not determine upload time for {link}")
modification_time = datetime.now()
title = link.parent.parent.parent.select_one("td.std:nth-child(3)").get_text().strip() # type: ignore
title = link.parent.parent.parent.select_one("td.std:nth-child(3)").get_text().strip()
title += ".mp4"
video_name: str = _sanitize_path_name(title)
@@ -900,25 +884,29 @@ class IliasPage:
results: list[IliasPageElement] = []
if link := cast(Optional[Tag], self._soup.select_one("#tab_submission > a")):
results.append(IliasPageElement.create_new(
IliasElementType.EXERCISE_FILES,
self._abs_url_from_link(link),
"Submission"
))
results.append(
IliasPageElement.create_new(
IliasElementType.EXERCISE_FILES, self._abs_url_from_link(link), "Submission"
)
)
else:
log.explain("Found no submission link for exercise, maybe it has not started yet?")
# Find all download links in the container (this will contain all the *feedback* files)
download_links = cast(list[Tag], self._soup.find_all(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x is not None and "cmd=download" in x},
text="Download"
))
download_links = cast(
list[Tag],
self._soup.find_all(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x is not None and "cmd=download" in x},
text="Download",
),
)
for link in download_links:
parent_row: Tag = cast(Tag, link.find_parent(
attrs={"class": lambda x: x is not None and "row" in x}))
parent_row: Tag = cast(
Tag, link.find_parent(attrs={"class": lambda x: x is not None and "row" in x})
)
name_tag = cast(Optional[Tag], parent_row.find(name="div"))
if not name_tag:
@@ -929,11 +917,9 @@ class IliasPage:
name = _sanitize_path_name(name_tag.get_text().strip())
log.explain(f"Found exercise detail entry {name!r}")
results.append(IliasPageElement.create_new(
IliasElementType.FILE,
self._abs_url_from_link(link),
name
))
results.append(
IliasPageElement.create_new(IliasElementType.FILE, self._abs_url_from_link(link), name)
)
return results
@@ -941,12 +927,15 @@ class IliasPage:
results: list[IliasPageElement] = []
# Find all download links in the container
download_links = cast(list[Tag], self._soup.find_all(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x is not None and "cmd=download" in x},
text="Download"
))
download_links = cast(
list[Tag],
self._soup.find_all(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x is not None and "cmd=download" in x},
text="Download",
),
)
for link in download_links:
parent_row: Tag = cast(Tag, link.find_parent("tr"))
@@ -963,12 +952,9 @@ class IliasPage:
if date is None:
log.warn(f"Date parsing failed for exercise file entry {name!r}")
results.append(IliasPageElement.create_new(
IliasElementType.FILE,
self._abs_url_from_link(link),
name,
date
))
results.append(
IliasPageElement.create_new(IliasElementType.FILE, self._abs_url_from_link(link), name, date)
)
return results
@@ -993,11 +979,11 @@ class IliasPage:
continue
name = _sanitize_path_name(exercise.get_text().strip())
results.append(IliasPageElement.create_new(
IliasElementType.EXERCISE,
self._abs_url_from_link(exercise),
name
))
results.append(
IliasPageElement.create_new(
IliasElementType.EXERCISE, self._abs_url_from_link(exercise), name
)
)
for result in results:
log.explain(f"Found exercise {result.name!r}")
@@ -1043,13 +1029,11 @@ class IliasPage:
continue
log.explain(f"Found {element_name!r} of type {element_type}")
result.append(IliasPageElement.create_new(
element_type,
abs_url,
element_name,
description=description,
skip_sanitize=True
))
result.append(
IliasPageElement.create_new(
element_type, abs_url, element_name, description=description, skip_sanitize=True
)
)
result += self._find_cards()
result += self._find_mediacast_videos()
@@ -1086,11 +1070,13 @@ class IliasPage:
if not title.endswith(".mp4") and not title.endswith(".webm"):
# just to make sure it has some kinda-alrightish ending
title = title + ".mp4"
videos.append(IliasPageElement.create_new(
typ=IliasElementType.MEDIACAST_VIDEO,
url=self._abs_url_from_relative(cast(str, url)),
name=_sanitize_path_name(title)
))
videos.append(
IliasPageElement.create_new(
typ=IliasElementType.MEDIACAST_VIDEO,
url=self._abs_url_from_relative(cast(str, url)),
name=_sanitize_path_name(title),
)
)
return videos
@@ -1114,12 +1100,11 @@ class IliasPage:
log.explain(f"Found external video at {url}, ignoring")
continue
videos.append(IliasPageElement.create_new(
typ=IliasElementType.MOB_VIDEO,
url=url,
name=_sanitize_path_name(title),
mtime=None
))
videos.append(
IliasPageElement.create_new(
typ=IliasElementType.MOB_VIDEO, url=url, name=_sanitize_path_name(title), mtime=None
)
)
return videos
@@ -1161,11 +1146,11 @@ class IliasPage:
# We should not crawl files under meetings
if "ilContainerListItemContentCB" in cast(str, parent.get("class")):
link: Tag = parent.parent.find("a") # type: ignore
link: Tag = parent.parent.find("a")
typ = IliasPage._find_type_for_element(
"meeting",
self._abs_url_from_link(link),
lambda: IliasPage._find_icon_for_folder_entry(link)
lambda: IliasPage._find_icon_for_folder_entry(link),
)
return typ == IliasElementType.MEETING
@@ -1191,9 +1176,11 @@ class IliasPage:
# This is for these weird JS-y blocks and custom item groups
if "ilContainerItemsContainer" in cast(str, parent.get("class")):
data_store_url = parent.parent.get("data-store-url", "").lower() # type: ignore
is_custom_item_group = "baseclass=ilcontainerblockpropertiesstoragegui" in data_store_url \
and "cont_block_id=" in data_store_url
data_store_url = parent.parent.get("data-store-url", "").lower()
is_custom_item_group = (
"baseclass=ilcontainerblockpropertiesstoragegui" in data_store_url
and "cont_block_id=" in data_store_url
)
# I am currently under the impression that *only* those JS blocks have an
# ilNoDisplay class.
if not is_custom_item_group and "ilNoDisplay" not in cast(str, parent.get("class")):
@@ -1212,11 +1199,15 @@ class IliasPage:
if outer_accordion_content:
accordion_tag = cast(Tag, outer_accordion_content.parent)
head_tag = cast(Tag, accordion_tag.find(attrs={
"class": lambda x: x is not None and (
"ilc_va_ihead_VAccordIHead" in x or "ilc_va_ihead_AccordIHead" in x
)
}))
head_tag = cast(
Tag,
accordion_tag.find(
attrs={
"class": lambda x: x is not None
and ("ilc_va_ihead_VAccordIHead" in x or "ilc_va_ihead_AccordIHead" in x)
}
),
)
found_titles.append(head_tag.get_text().strip())
return [_sanitize_path_name(x) for x in reversed(found_titles)]
@@ -1224,14 +1215,12 @@ class IliasPage:
@staticmethod
def _find_link_description(link: Tag) -> Optional[str]:
tile = cast(
Tag,
link.find_parent("div", {"class": lambda x: x is not None and "il_ContainerListItem" in x})
Tag, link.find_parent("div", {"class": lambda x: x is not None and "il_ContainerListItem" in x})
)
if not tile:
return None
description_element = cast(
Tag,
tile.find("div", {"class": lambda x: x is not None and "il_Description" in x})
Tag, tile.find("div", {"class": lambda x: x is not None and "il_Description" in x})
)
if not description_element:
return None
@@ -1242,9 +1231,15 @@ class IliasPage:
# Files have a list of properties (type, modification date, size, etc.)
# In a series of divs.
# Find the parent containing all those divs, so we can filter our what we need
properties_parent = cast(Tag, cast(Tag, link_element.find_parent(
"div", {"class": lambda x: x is not None and "il_ContainerListItem" in x}
)).select_one(".il_ItemProperties"))
properties_parent = cast(
Tag,
cast(
Tag,
link_element.find_parent(
"div", {"class": lambda x: x is not None and "il_ContainerListItem" in x}
),
).select_one(".il_ItemProperties"),
)
# The first one is always the filetype
file_type = cast(Tag, properties_parent.select_one("span.il_ItemProperty")).get_text().strip()
@@ -1271,9 +1266,7 @@ class IliasPage:
for title in card_titles:
url = self._abs_url_from_link(title)
name = _sanitize_path_name(title.get_text().strip())
typ = IliasPage._find_type_for_element(
name, url, lambda: IliasPage._find_icon_from_card(title)
)
typ = IliasPage._find_type_for_element(name, url, lambda: IliasPage._find_icon_from_card(title))
if not typ:
_unexpected_html_warning()
@@ -1300,13 +1293,14 @@ class IliasPage:
continue
url = self._abs_url_from_relative(open_match.group(1))
name = _sanitize_path_name(button.get_text().strip())
typ = IliasPage._find_type_for_element(
name, url, lambda: IliasPage._find_icon_from_card(button)
typ = IliasPage._find_type_for_element(name, url, lambda: IliasPage._find_icon_from_card(button))
caption_parent = cast(
Tag,
button.find_parent(
"div",
attrs={"class": lambda x: x is not None and "caption" in x},
),
)
caption_parent = cast(Tag, button.find_parent(
"div",
attrs={"class": lambda x: x is not None and "caption" in x},
))
caption_container = caption_parent.find_next_sibling("div")
if caption_container:
description = caption_container.get_text().strip()
@@ -1377,9 +1371,7 @@ class IliasPage:
if found_parent is None:
_unexpected_html_warning()
log.warn_contd(
f"Tried to figure out element type, but did not find an icon for {link_element!r}"
)
log.warn_contd(f"Tried to figure out element type, but did not find an icon for {link_element!r}")
return None
# Find the small descriptive icon to figure out the type
@@ -1389,8 +1381,7 @@ class IliasPage:
img_tag = found_parent.select_one("img.icon")
is_session_expansion_button = found_parent.find(
"a",
attrs={"href": lambda x: x is not None and ("crs_next_sess=" in x or "crs_prev_sess=" in x)}
"a", attrs={"href": lambda x: x is not None and ("crs_next_sess=" in x or "crs_prev_sess=" in x)}
)
if img_tag is None and is_session_expansion_button:
log.explain("Found session expansion button, skipping it as it has no content")
@@ -1447,9 +1438,7 @@ class IliasPage:
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
recursive=True, name="table", attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
@@ -1462,8 +1451,7 @@ class IliasPage:
@staticmethod
def _find_date_in_text(text: str) -> Optional[datetime]:
modification_date_match = re.search(
r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)",
text
r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)", text
)
if modification_date_match is not None:
modification_date_str = modification_date_match.group(1)
@@ -1501,8 +1489,8 @@ def _unexpected_html_warning() -> None:
log.warn("Encountered unexpected HTML structure, ignoring element.")
german_months = ['Jan', 'Feb', 'Mär', 'Apr', 'Mai', 'Jun', 'Jul', 'Aug', 'Sep', 'Okt', 'Nov', 'Dez']
english_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
german_months = ["Jan", "Feb", "Mär", "Apr", "Mai", "Jun", "Jul", "Aug", "Sep", "Okt", "Nov", "Dez"]
english_months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
def demangle_date(date_str: str, fail_silently: bool = False) -> Optional[datetime]:
@@ -1579,7 +1567,7 @@ def parse_ilias_forum_export(forum_export: BeautifulSoup) -> list[IliasForumThre
title = cast(Tag, p.find("b")).text
if ":" in title:
title = title[title.find(":") + 1:]
title = title[title.find(":") + 1 :]
title = title.strip()
if not content_tag or content_tag.find_previous_sibling("p") != title_tag:
@@ -1604,7 +1592,7 @@ def _guess_timestamp_from_forum_post_content(content: Tag) -> Optional[datetime]
for post in posts:
text = post.text.strip()
text = text[text.rfind("|") + 1:]
text = text[text.rfind("|") + 1 :]
date = demangle_date(text, fail_silently=True)
if not date:
continue

View File

@@ -38,9 +38,7 @@ class ShibbolethLogin:
async with sess.get(url) as response:
shib_url = response.url
if str(shib_url).startswith(self._ilias_url):
log.explain(
"ILIAS recognized our shib token and logged us in in the background, returning"
)
log.explain("ILIAS recognized our shib token and logged us in in the background, returning")
return
soup: BeautifulSoup = soupify(await response.read())
@@ -62,7 +60,7 @@ class ShibbolethLogin:
"fudis_web_authn_assertion_input": "",
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] # type: ignore
data["csrf_token"] = csrf_token_input["value"]
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
@@ -81,7 +79,7 @@ class ShibbolethLogin:
# (or clicking "Continue" if you have JS disabled)
relay_state = cast(Tag, soup.find("input", {"name": "RelayState"}))
saml_response = cast(Tag, soup.find("input", {"name": "SAMLResponse"}))
url = form = soup.find("form", {"method": "post"})["action"] # type: ignore
url = form = soup.find("form", {"method": "post"})["action"]
data = { # using the info obtained in the while loop above
"RelayState": cast(str, relay_state["value"]),
"SAMLResponse": cast(str, saml_response["value"]),
@@ -110,7 +108,7 @@ class ShibbolethLogin:
"fudis_otp_input": tfa_token,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] # type: ignore
data["csrf_token"] = csrf_token_input["value"]
return await _post(session, url, data)
@staticmethod