Compare commits

..

1 Commits

Author SHA1 Message Date
4af02012bc Strip long path prefix from file links in report 2024-11-14 20:06:13 +01:00
20 changed files with 664 additions and 1122 deletions

View File

@ -14,7 +14,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-13, macos-latest]
python: ["3.11"]
python: ["3.9"]
steps:
- uses: actions/checkout@v4

View File

@ -22,37 +22,6 @@ ambiguous situations.
## Unreleased
## 3.8.2 - 2025-04-29
## Changed
- Explicitly mention that wikis are not supported at the moment and ignore them
## Fixed
- Ilias-native login
- Exercise crawling
## 3.8.1 - 2025-04-17
## Fixed
- Description html files now specify at UTF-8 encoding
- Images in descriptions now always have a white background
## 3.8.0 - 2025-04-16
### Added
- Support for ILIAS 9
### Changed
- Added prettier CSS to forum threads
- Downloaded forum threads now link to the forum instead of the ILIAS thread
- Increase minimum supported Python version to 3.11
- Do not crawl nested courses (courses linked in other courses)
## Fixed
- File links in report on Windows
- TOTP authentication in KIT Shibboleth
- Forum crawling only considering the first 20 entries
## 3.7.0 - 2024-11-13
### Added

View File

@ -1,4 +1,4 @@
from typing import Optional, Tuple, cast
from typing import Optional, Tuple
import keyring
@ -13,7 +13,7 @@ class KeyringAuthSection(AuthSection):
return self.s.get("username")
def keyring_name(self) -> str:
return cast(str, self.s.get("keyring_name", fallback=NAME))
return self.s.get("keyring_name", fallback=NAME)
class KeyringAuthenticator(Authenticator):

View File

@ -149,7 +149,9 @@ class CrawlerSection(Section):
return self.s.getboolean("skip", fallback=False)
def output_dir(self, name: str) -> Path:
name = name.removeprefix("crawl:")
# TODO Use removeprefix() after switching to 3.9
if name.startswith("crawl:"):
name = name[len("crawl:"):]
return Path(self.s.get("output_dir", name)).expanduser()
def redownload(self) -> Redownload:
@ -292,35 +294,6 @@ class Crawler(ABC):
log.explain("Answer: Yes")
return CrawlToken(self._limiter, path)
def should_try_download(
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
) -> bool:
log.explain_topic(f"Decision: Should Download {fmt_path(path)}")
if self._transformer.transform(path) is None:
log.explain("Answer: No (ignored)")
return False
should_download = self._output_dir.should_try_download(
path,
etag_differs=etag_differs,
mtime=mtime,
redownload=redownload,
on_conflict=on_conflict
)
if should_download:
log.explain("Answer: Yes")
return True
else:
log.explain("Answer: No")
return False
async def download(
self,
path: PurePath,

View File

@ -3,7 +3,7 @@ import http.cookies
import ssl
from datetime import datetime
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional, Tuple, cast
from typing import Any, Dict, List, Optional, Tuple
import aiohttp
import certifi
@ -22,7 +22,7 @@ ETAGS_CUSTOM_REPORT_VALUE_KEY = "etags"
class HttpCrawlerSection(CrawlerSection):
def http_timeout(self) -> float:
return self.s.getfloat("http_timeout", fallback=30)
return self.s.getfloat("http_timeout", fallback=20)
class HttpCrawler(Crawler):
@ -187,12 +187,12 @@ class HttpCrawler(Crawler):
if level == 0 or (level == 1 and drop_h1):
return PurePath()
level_heading = cast(Optional[Tag], tag.find_previous(name=f"h{level}"))
level_heading = tag.find_previous(name=f"h{level}")
if level_heading is None:
return find_associated_headings(tag, level - 1)
folder_name = level_heading.get_text().strip()
folder_name = level_heading.getText().strip()
return find_associated_headings(level_heading, level - 1) / folder_name
# start at level <h3> because paragraph-level headings are usually too granular for folder names
@ -231,7 +231,6 @@ class HttpCrawler(Crawler):
etag_header = resp.headers.get("ETag")
last_modified_header = resp.headers.get("Last-Modified")
last_modified = None
if last_modified_header:
try:

View File

@ -1,5 +1,5 @@
from enum import Enum
from typing import Optional, cast
from typing import Optional
import bs4
@ -126,88 +126,6 @@ _learning_module_template = """
</html>
"""
_forum_thread_template = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>ILIAS - Forum: {{name}}</title>
<style>
* {
box-sizing: border-box;
}
body {
font-family: 'Open Sans', Verdana, Arial, Helvetica, sans-serif;
padding: 8px;
}
ul, ol, p {
margin: 1.2em 0;
}
p {
margin-top: 8px;
margin-bottom: 8px;
}
a {
color: #00876c;
text-decoration: none;
cursor: pointer;
}
a:hover {
text-decoration: underline;
}
body > p:first-child > span:first-child {
font-size: 1.6em;
}
body > p:first-child > span:first-child ~ span.default {
display: inline-block;
font-size: 1.2em;
padding-bottom: 8px;
}
.ilFrmPostContent {
margin-top: 8px;
max-width: 64em;
}
.ilFrmPostContent > *:first-child {
margin-top: 0px;
}
.ilFrmPostTitle {
margin-top: 24px;
color: #00876c;
font-weight: bold;
}
#ilFrmPostList {
list-style: none;
padding-left: 0;
}
li.ilFrmPostRow {
padding: 3px 0 3px 3px;
margin-bottom: 24px;
border-left: 6px solid #dddddd;
}
.ilFrmPostRow > div {
display: flex;
}
.ilFrmPostImage img {
margin: 0 !important;
padding: 6px 9px 9px 6px;
}
.ilUserIcon {
width: 115px;
}
.small {
text-decoration: none;
font-size: 0.75rem;
color: #6f6f6f;
}
</style>
</head>
<body>
{{heading}}
{{content}}
</body>
</html>
""".strip() # noqa: E501 line too long
def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str:
# Seems to be comments, ignore those.
@ -221,13 +139,13 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
</div>
"""
if prev and body.select_one(".ilc_page_lnav_LeftNavigation"):
text = cast(bs4.Tag, body.select_one(".ilc_page_lnav_LeftNavigation")).get_text().strip()
text = body.select_one(".ilc_page_lnav_LeftNavigation").getText().strip()
left = f'<a href="{prev}">{text}</a>'
else:
left = "<span></span>"
if next and body.select_one(".ilc_page_rnav_RightNavigation"):
text = cast(bs4.Tag, body.select_one(".ilc_page_rnav_RightNavigation")).get_text().strip()
text = body.select_one(".ilc_page_rnav_RightNavigation").getText().strip()
right = f'<a href="{next}">{text}</a>'
else:
right = "<span></span>"
@ -242,17 +160,8 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
"{{left}}", left).replace("{{right}}", right).encode())
)
body_str = cast(str, body.prettify())
return _learning_module_template.replace("{{body}}", body_str).replace("{{name}}", name)
def forum_thread_template(name: str, url: str, heading: bs4.Tag, content: bs4.Tag) -> str:
if title := cast(Optional[bs4.Tag], heading.find(name="b")):
title.wrap(bs4.Tag(name="a", attrs={"href": url}))
return _forum_thread_template \
.replace("{{name}}", name) \
.replace("{{heading}}", cast(str, heading.prettify())) \
.replace("{{content}}", cast(str, content.prettify()))
body = body.prettify()
return _learning_module_template.replace("{{body}}", body).replace("{{name}}", name)
class Links(Enum):

View File

@ -1,5 +1,3 @@
from typing import cast
from bs4 import BeautifulSoup, Comment, Tag
_STYLE_TAG_CONTENT = """
@ -39,10 +37,6 @@ _STYLE_TAG_CONTENT = """
margin: 0.5rem 0;
}
img {
background-color: white;
}
body {
padding: 1em;
grid-template-columns: 1fr min(60rem, 90%) 1fr;
@ -60,11 +54,12 @@ _ARTICLE_WORTHY_CLASSES = [
def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup:
head = soup.new_tag("head")
soup.insert(0, head)
# Force UTF-8 encoding
head.append(soup.new_tag("meta", charset="utf-8"))
simplecss_link: Tag = soup.new_tag("link")
# <link rel="stylesheet" href="https://cdn.simplecss.org/simple.css">
head.append(soup.new_tag("link", rel="stylesheet", href="https://cdn.simplecss.org/simple.css"))
simplecss_link["rel"] = "stylesheet"
simplecss_link["href"] = "https://cdn.simplecss.org/simple.css"
head.append(simplecss_link)
# Basic style tags for compat
style: Tag = soup.new_tag("style")
@ -75,18 +70,18 @@ def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup:
def clean(soup: BeautifulSoup) -> BeautifulSoup:
for block in cast(list[Tag], soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES)):
for block in soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES):
block.name = "article"
for block in cast(list[Tag], soup.find_all("h3")):
for block in soup.find_all("h3"):
block.name = "div"
for block in cast(list[Tag], soup.find_all("h1")):
for block in soup.find_all("h1"):
block.name = "h3"
for block in cast(list[Tag], soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap")):
for block in soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap"):
block.name = "h3"
block["class"] += ["accordion-head"] # type: ignore
block["class"] += ["accordion-head"]
for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"):
children = list(dummy.children)
@ -102,7 +97,7 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup:
if figure := video.find_parent("figure"):
figure.decompose()
for hrule_imposter in cast(list[Tag], soup.find_all(class_="ilc_section_Separator")):
for hrule_imposter in soup.find_all(class_="ilc_section_Separator"):
hrule_imposter.insert(0, soup.new_tag("hr"))
return soup

View File

@ -19,10 +19,10 @@ from ...utils import fmt_path, soupify, url_set_query_param
from ..crawler import CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical
from ..http_crawler import HttpCrawler, HttpCrawlerSection
from .async_helper import _iorepeat
from .file_templates import Links, forum_thread_template, learning_module_template
from .file_templates import Links, learning_module_template
from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
IliasPageElement, IliasSoup, _sanitize_path_name, parse_ilias_forum_export)
IliasPageElement, _sanitize_path_name, parse_ilias_forum_export)
from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int]
@ -105,9 +105,9 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
_DIRECTORY_PAGES: Set[IliasElementType] = {
IliasElementType.COURSE,
IliasElementType.EXERCISE,
IliasElementType.EXERCISE_FILES,
IliasElementType.EXERCISE_OVERVIEW,
IliasElementType.FOLDER,
IliasElementType.INFO_TAB,
IliasElementType.MEDIACAST_VIDEO_FOLDER,
@ -217,19 +217,11 @@ instance's greatest bottleneck.
async def _crawl_desktop(self) -> None:
await self._crawl_url(
urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show"),
crawl_nested_courses=True
urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show")
)
async def _crawl_url(
self,
url: str,
expected_id: Optional[int] = None,
crawl_nested_courses: bool = False
) -> None:
if awaitable := await self._handle_ilias_page(
url, None, PurePath("."), expected_id, crawl_nested_courses
):
async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None:
if awaitable := await self._handle_ilias_page(url, None, PurePath("."), expected_id):
await awaitable
async def _handle_ilias_page(
@ -238,7 +230,6 @@ instance's greatest bottleneck.
current_element: Optional[IliasPageElement],
path: PurePath,
expected_course_id: Optional[int] = None,
crawl_nested_courses: bool = False
) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(path)
if not maybe_cl:
@ -246,9 +237,7 @@ instance's greatest bottleneck.
if current_element:
self._ensure_not_seen(current_element, path)
return self._crawl_ilias_page(
url, current_element, maybe_cl, expected_course_id, crawl_nested_courses
)
return self._crawl_ilias_page(url, current_element, maybe_cl, expected_course_id)
@anoncritical
async def _crawl_ilias_page(
@ -257,7 +246,6 @@ instance's greatest bottleneck.
current_element: Optional[IliasPageElement],
cl: CrawlToken,
expected_course_id: Optional[int] = None,
crawl_nested_courses: bool = False,
) -> None:
elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
@ -269,7 +257,6 @@ instance's greatest bottleneck.
async with cl:
next_stage_url: Optional[str] = url
current_parent = current_element
page = None
while next_stage_url:
soup = await self._get_page(next_stage_url)
@ -279,19 +266,18 @@ instance's greatest bottleneck.
# If we expect to find a root course, enforce it
if current_parent is None and expected_course_id is not None:
perma_link = IliasPage.get_soup_permalink(soup)
if not perma_link or "crs/" not in perma_link:
if not perma_link or "crs_" not in perma_link:
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
if str(expected_course_id) not in perma_link:
raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}")
page = IliasPage(soup, current_parent)
page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
next_stage_url = next_element.url
else:
next_stage_url = None
page = cast(IliasPage, page)
elements.extend(page.get_child_elements())
if description_string := page.get_description():
description.append(description_string)
@ -306,7 +292,7 @@ instance's greatest bottleneck.
tasks: List[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(cl.path, element, crawl_nested_courses):
if handle := await self._handle_ilias_element(cl.path, element):
tasks.append(asyncio.create_task(handle))
# And execute them
@ -322,7 +308,6 @@ instance's greatest bottleneck.
self,
parent_path: PurePath,
element: IliasPageElement,
crawl_nested_courses: bool = False
) -> Optional[Coroutine[Any, Any, None]]:
# element.name might contain `/` if the crawler created nested elements,
# so we can not sanitize it here. We trust in the output dir to thwart worst-case
@ -375,64 +360,6 @@ instance's greatest bottleneck.
"[bright_black](scorm learning modules are not supported)"
)
return None
elif element.type == IliasElementType.LITERATURE_LIST:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](literature lists are not currently supported)"
)
return None
elif element.type == IliasElementType.LEARNING_MODULE_HTML:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](HTML learning modules are not supported)"
)
return None
elif element.type == IliasElementType.BLOG:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](blogs are not currently supported)"
)
return None
elif element.type == IliasElementType.DCL_RECORD_LIST:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](dcl record lists are not currently supported)"
)
return None
elif element.type == IliasElementType.MEDIA_POOL:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](media pools are not currently supported)"
)
return None
elif element.type == IliasElementType.COURSE:
if crawl_nested_courses:
return await self._handle_ilias_page(element.url, element, element_path)
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](not descending into linked course)"
)
return None
elif element.type == IliasElementType.WIKI:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](wikis are not currently supported)"
)
return None
elif element.type == IliasElementType.LEARNING_MODULE:
return await self._handle_learning_module(element, element_path)
elif element.type == IliasElementType.LINK:
@ -534,10 +461,10 @@ instance's greatest bottleneck.
if not dl:
return
async with dl as (_bar, sink):
async with dl as (bar, sink):
description = clean(insert_base_markup(description))
description_tag = await self.internalize_images(description)
sink.file.write(cast(str, description_tag.prettify()).encode("utf-8"))
description = await self.internalize_images(description)
sink.file.write(description.prettify().encode("utf-8"))
sink.done()
@anoncritical
@ -556,7 +483,7 @@ instance's greatest bottleneck.
async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()).select_one("a").get("href").strip() # type: ignore
return soupify(await resp.read()).select_one("a").get("href").strip()
# We are either unauthenticated or the link is not active
new_url = resp.headers[hdrs.LOCATION].lower()
if "baseclass=illinkresourcehandlergui" in new_url and "cmd=infoscreen" in new_url:
@ -661,7 +588,7 @@ instance's greatest bottleneck.
)
async with dl as (bar, sink):
page = IliasPage(await self._get_page(element.url), element)
page = IliasPage(await self._get_page(element.url), element.url, element)
stream_elements = page.get_child_elements()
if len(stream_elements) > 1:
@ -671,7 +598,7 @@ instance's greatest bottleneck.
stream_element = stream_elements[0]
# We do not have a local cache yet
await self._stream_from_url(stream_element, sink, bar, is_video=True)
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
add_to_report([str(self._transformer.transform(dl.path))])
return
@ -686,7 +613,7 @@ instance's greatest bottleneck.
async with maybe_dl as (bar, sink):
log.explain(f"Streaming video from real url {stream_element.url}")
contained_video_paths.append(str(self._transformer.transform(maybe_dl.path)))
await self._stream_from_url(stream_element, sink, bar, is_video=True)
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
add_to_report(contained_video_paths)
@ -708,19 +635,12 @@ instance's greatest bottleneck.
async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None:
assert dl # The function is only reached when dl is not None
async with dl as (bar, sink):
await self._stream_from_url(element, sink, bar, is_video)
async def _stream_from_url(
self,
element: IliasPageElement,
sink: FileSink,
bar: ProgressBar,
is_video: bool
) -> None:
url = element.url
await self._stream_from_url(element.url, sink, bar, is_video)
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None:
async def try_stream() -> bool:
next_url = url
# Normal files redirect to the magazine if we are not authenticated. As files could be HTML,
# we can not match on the content type here. Instead, we disallow redirects and inspect the
# new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume
@ -768,7 +688,7 @@ instance's greatest bottleneck.
await self.authenticate(auth_id)
if not await try_stream():
raise CrawlError(f"File streaming failed after authenticate() {element!r}")
raise CrawlError("File streaming failed after authenticate()")
async def _handle_forum(
self,
@ -783,23 +703,36 @@ instance's greatest bottleneck.
@_iorepeat(3, "crawling forum")
@anoncritical
async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None:
elements: List[IliasForumThread] = []
async with cl:
inner = IliasPage(await self._get_page(element.url), element)
export_url = inner.get_forum_export_url()
if not export_url:
log.warn("Could not extract forum export url")
next_stage_url = element.url
while next_stage_url:
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
soup = await self._get_page(next_stage_url)
page = IliasPage(soup, next_stage_url, element)
if next := page.get_next_stage_element():
next_stage_url = next.url
else:
break
download_data = page.get_download_forum_data()
if not download_data:
raise CrawlWarning("Failed to extract forum data")
if download_data.empty:
log.explain("Forum had no threads")
return
html = await self._post_authenticated(download_data.url, download_data.form_data)
elements = parse_ilias_forum_export(soupify(html))
export = await self._post(export_url, {
"format": "html",
"cmd[createExportFile]": ""
})
elements = parse_ilias_forum_export(soupify(export))
elements.sort(key=lambda elem: elem.title)
tasks: List[Awaitable[None]] = []
for thread in elements:
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, element.url)))
for elem in elements:
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, elem)))
# And execute them
await self.gather(tasks)
@ -809,22 +742,18 @@ instance's greatest bottleneck.
async def _download_forum_thread(
self,
parent_path: PurePath,
thread: Union[IliasForumThread, IliasPageElement],
forum_url: str
element: IliasForumThread,
) -> None:
path = parent_path / (_sanitize_path_name(thread.name) + ".html")
maybe_dl = await self.download(path, mtime=thread.mtime)
if not maybe_dl or not isinstance(thread, IliasForumThread):
path = parent_path / (_sanitize_path_name(element.title) + ".html")
maybe_dl = await self.download(path, mtime=element.mtime)
if not maybe_dl:
return
async with maybe_dl as (bar, sink):
rendered = forum_thread_template(
thread.name,
forum_url,
thread.name_tag,
await self.internalize_images(thread.content_tag)
)
sink.file.write(rendered.encode("utf-8"))
content = "<!DOCTYPE html>\n"
content += element.title_tag.prettify()
content += element.content_tag.prettify()
sink.file.write(content.encode("utf-8"))
sink.done()
async def _handle_learning_module(
@ -848,7 +777,7 @@ instance's greatest bottleneck.
log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {element.url}")
soup = await self._get_page(element.url)
page = IliasPage(soup, element)
page = IliasPage(soup, element.url, element)
if next := page.get_learning_module_data():
elements.extend(await self._crawl_learning_module_direction(
cl.path, next.previous_url, "left", element
@ -891,7 +820,7 @@ instance's greatest bottleneck.
log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})")
log.explain(f"URL: {next_element_url}")
soup = await self._get_page(next_element_url)
page = IliasPage(soup, parent_element)
page = IliasPage(soup, next_element_url, parent_element)
if next := page.get_learning_module_data():
elements.append(next)
if dir == "left":
@ -922,13 +851,13 @@ instance's greatest bottleneck.
if prev:
prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html"))
if prev_p:
prev = cast(str, os.path.relpath(prev_p, my_path.parent))
prev = os.path.relpath(prev_p, my_path.parent)
else:
prev = None
if next:
next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html"))
if next_p:
next = cast(str, os.path.relpath(next_p, my_path.parent))
next = os.path.relpath(next_p, my_path.parent)
else:
next = None
@ -948,15 +877,15 @@ instance's greatest bottleneck.
continue
if elem.name == "img":
if src := elem.attrs.get("src", None):
url = urljoin(self._base_url, cast(str, src))
url = urljoin(self._base_url, src)
if not url.startswith(self._base_url):
continue
log.explain(f"Internalizing {url!r}")
img = await self._get_authenticated(url)
elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode()
if elem.name == "iframe" and cast(str, elem.attrs.get("src", "")).startswith("//"):
if elem.name == "iframe" and elem.attrs.get("src", "").startswith("//"):
# For unknown reasons the protocol seems to be stripped.
elem.attrs["src"] = "https:" + cast(str, elem.attrs["src"])
elem.attrs["src"] = "https:" + elem.attrs["src"]
return tag
def _ensure_not_seen(self, element: IliasPageElement, parent_path: PurePath) -> None:
@ -968,10 +897,10 @@ instance's greatest bottleneck.
)
self._visited_urls[element.url] = parent_path
async def _get_page(self, url: str, root_page_allowed: bool = False) -> IliasSoup:
async def _get_page(self, url: str, root_page_allowed: bool = False) -> BeautifulSoup:
auth_id = await self._current_auth_id()
async with self.session.get(url) as request:
soup = IliasSoup(soupify(await request.read()), str(request.url))
soup = soupify(await request.read())
if IliasPage.is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed)
@ -980,13 +909,13 @@ instance's greatest bottleneck.
# Retry once after authenticating. If this fails, we will die.
async with self.session.get(url) as request:
soup = IliasSoup(soupify(await request.read()), str(request.url))
soup = soupify(await request.read())
if IliasPage.is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed)
raise CrawlError(f"get_page failed even after authenticating on {url!r}")
@staticmethod
def _verify_page(soup: IliasSoup, url: str, root_page_allowed: bool) -> IliasSoup:
def _verify_page(soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup:
if IliasPage.is_root_page(soup) and not root_page_allowed:
raise CrawlError(
"Unexpectedly encountered ILIAS root page. "
@ -998,19 +927,29 @@ instance's greatest bottleneck.
)
return soup
async def _post(
async def _post_authenticated(
self,
url: str,
data: dict[str, Union[str, List[str]]]
) -> bytes:
auth_id = await self._current_auth_id()
form_data = aiohttp.FormData()
for key, val in data.items():
form_data.add_field(key, val)
async with self.session.post(url, data=form_data()) as request:
async with self.session.post(url, data=form_data(), allow_redirects=False) as request:
if request.status == 200:
return await request.read()
raise CrawlError(f"post failed with status {request.status}")
# We weren't authenticated, so try to do that
await self.authenticate(auth_id)
# Retry once after authenticating. If this fails, we will die.
async with self.session.post(url, data=data, allow_redirects=False) as request:
if request.status == 200:
return await request.read()
raise CrawlError("post_authenticated failed even after authenticating")
async def _get_authenticated(self, url: str) -> bytes:
auth_id = await self._current_auth_id()
@ -1040,22 +979,52 @@ instance's greatest bottleneck.
async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request:
login_page = soupify(await request.read())
login_form = cast(Optional[Tag], login_page.find("form", attrs={"name": "login_form"}))
login_form = login_page.find("form", attrs={"name": "formlogin"})
if login_form is None:
raise CrawlError("Could not find the login form! Specified client id might be invalid.")
login_url = cast(Optional[str], login_form.attrs.get("action"))
login_url = login_form.attrs.get("action")
if login_url is None:
raise CrawlError("Could not find the action URL in the login form!")
username, password = await self._auth.credentials()
login_form_data = aiohttp.FormData()
login_form_data.add_field('login_form/input_3/input_4', username)
login_form_data.add_field('login_form/input_3/input_5', password)
login_data = {
"username": username,
"password": password,
"cmd[doStandardAuthentication]": "Login",
}
# do the actual login
async with self.session.post(urljoin(self._base_url, login_url), data=login_form_data) as request:
soup = IliasSoup(soupify(await request.read()), str(request.url))
if not IliasPage.is_logged_in(soup):
async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request:
soup = soupify(await request.read())
if not self._is_logged_in(soup):
self._auth.invalidate_credentials()
@staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,8 @@
from typing import Any, Optional, cast
from typing import Any, Optional
import aiohttp
import yarl
from bs4 import BeautifulSoup, Tag
from bs4 import BeautifulSoup
from ...auth import Authenticator, TfaAuthenticator
from ...logging import log
@ -48,8 +48,8 @@ class ShibbolethLogin:
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = cast(Tag, soup.find("form", {"method": "post"}))
action = cast(str, form["action"])
form = soup.find("form", {"method": "post"})
action = form["action"]
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
@ -59,10 +59,9 @@ class ShibbolethLogin:
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
"fudis_web_authn_assertion_input": "",
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] # type: ignore
data["csrf_token"] = csrf_token_input["value"]
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
@ -79,14 +78,14 @@ class ShibbolethLogin:
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = cast(Tag, soup.find("input", {"name": "RelayState"}))
saml_response = cast(Tag, soup.find("input", {"name": "SAMLResponse"}))
url = form = soup.find("form", {"method": "post"})["action"] # type: ignore
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = form = soup.find("form", {"method": "post"})["action"]
data = { # using the info obtained in the while loop above
"RelayState": cast(str, relay_state["value"]),
"SAMLResponse": cast(str, saml_response["value"]),
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
await sess.post(cast(str, url), data=data)
await sess.post(url, data=data)
async def _authenticate_tfa(
self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL
@ -98,8 +97,8 @@ class ShibbolethLogin:
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = cast(Tag, soup.find("form", {"method": "post"}))
action = cast(str, form["action"])
form = soup.find("form", {"method": "post"})
action = form["action"]
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
@ -107,10 +106,10 @@ class ShibbolethLogin:
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"fudis_otp_input": tfa_token,
"j_tokenNumber": tfa_token,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] # type: ignore
data["csrf_token"] = csrf_token_input["value"]
return await _post(session, url, data)
@staticmethod
@ -121,7 +120,7 @@ class ShibbolethLogin:
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="fudiscr-form") is not None
return soup.find(id="j_tokenNumber") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:

View File

@ -3,7 +3,7 @@ import re
from dataclasses import dataclass
from datetime import datetime
from pathlib import PurePath
from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union, cast
from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union
from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag
@ -156,11 +156,11 @@ class KitIpdCrawler(HttpCrawler):
name = os.path.basename(url)
return KitIpdFile(name, url)
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> list[Tag]:
return cast(list[Tag], tag.find_all(name="a", attrs={"href": self._file_regex}))
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> List[Tag]:
return tag.findAll(name="a", attrs={"href": self._file_regex})
def _abs_url_from_link(self, url: str, link_tag: Tag) -> str:
return urljoin(url, cast(str, link_tag.get("href")))
return urljoin(url, link_tag.get("href"))
async def _stream_from_url(self, url: str, path: PurePath, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp:

View File

@ -1,8 +1,9 @@
import asyncio
import sys
import traceback
from contextlib import AbstractContextManager, asynccontextmanager, contextmanager
from typing import AsyncIterator, Iterator, List, Optional
from contextlib import asynccontextmanager, contextmanager
# TODO In Python 3.9 and above, ContextManager is deprecated
from typing import AsyncIterator, ContextManager, Iterator, List, Optional
from rich.console import Console, Group
from rich.live import Live
@ -260,7 +261,7 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
action: str,
text: str,
total: Optional[float] = None,
) -> AbstractContextManager[ProgressBar]:
) -> ContextManager[ProgressBar]:
"""
Allows markup in the "style" argument which will be applied to the
"action" string.
@ -276,7 +277,7 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
action: str,
text: str,
total: Optional[float] = None,
) -> AbstractContextManager[ProgressBar]:
) -> ContextManager[ProgressBar]:
"""
Allows markup in the "style" argument which will be applied to the
"action" string.

View File

@ -371,22 +371,6 @@ class OutputDirectory:
raise OutputDirError("Failed to create temporary file")
def should_try_download(
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
) -> bool:
heuristics = Heuristics(etag_differs, mtime)
redownload = self._redownload if redownload is None else redownload
on_conflict = self._on_conflict if on_conflict is None else on_conflict
local_path = self.resolve(path)
return self._should_download(local_path, heuristics, redownload, on_conflict)
async def download(
self,
remote_path: PurePath,

View File

@ -1,5 +1,6 @@
from pathlib import Path, PurePath
from typing import Dict, List, Optional
from urllib.parse import quote
from rich.markup import escape
@ -170,7 +171,9 @@ class Pferd:
def fmt_path_link(relative_path: PurePath) -> str:
# We need to URL-encode the path because it might contain spaces or special characters
link = crawler.output_dir.resolve(relative_path).absolute().as_uri()
absolute_path = str(crawler.output_dir.resolve(relative_path).absolute())
absolute_path = absolute_path.replace("\\\\?\\", "")
link = f"file://{quote(absolute_path)}"
return f"[link={link}]{fmt_path(relative_path)}[/link]"
something_changed = False

View File

@ -34,6 +34,15 @@ class MarkConflictError(Exception):
self.collides_with = collides_with
# TODO Use PurePath.is_relative_to when updating to 3.9
def is_relative_to(a: PurePath, b: PurePath) -> bool:
try:
a.relative_to(b)
return True
except ValueError:
return False
class Report:
"""
A report of a synchronization. Includes all files found by the crawler, as
@ -164,7 +173,7 @@ class Report:
if path == other:
raise MarkDuplicateError(path)
if path.is_relative_to(other) or other.is_relative_to(path):
if is_relative_to(path, other) or is_relative_to(other, path):
raise MarkConflictError(path, other)
self.known_files.add(path)

View File

@ -1,2 +1,2 @@
NAME = "PFERD"
VERSION = "3.8.2"
VERSION = "3.7.0"

View File

@ -17,7 +17,7 @@ Binaries for Linux, Windows and Mac can be downloaded directly from the
### With pip
Ensure you have at least Python 3.11 installed. Run the following command to
Ensure you have at least Python 3.9 installed. Run the following command to
install PFERD or upgrade it to the latest version:
```

8
flake.lock generated
View File

@ -2,16 +2,16 @@
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1744440957,
"narHash": "sha256-FHlSkNqFmPxPJvy+6fNLaNeWnF1lZSgqVCl/eWaJRc4=",
"lastModified": 1708979614,
"narHash": "sha256-FWLWmYojIg6TeqxSnHkKpHu5SGnFP5um1uUjH+wRV6g=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "26d499fc9f1d567283d5d56fcf367edd815dba1d",
"rev": "b7ee09cf5614b02d289cd86fcfa6f24d4e078c2a",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-24.11",
"ref": "nixos-23.11",
"repo": "nixpkgs",
"type": "github"
}

View File

@ -2,7 +2,7 @@
description = "Tool for downloading course-related files from ILIAS";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.11";
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11";
};
outputs = { self, nixpkgs }:

View File

@ -12,7 +12,7 @@ dependencies = [
"certifi>=2021.10.8"
]
dynamic = ["version"]
requires-python = ">=3.11"
requires-python = ">=3.9"
[project.scripts]
pferd = "PFERD.__main__:main"