Compare commits

..

20 Commits

Author SHA1 Message Date
77a23265a9 Bump version to 3.8.2 2025-04-29 17:55:57 +02:00
4c230ef6dd Fix exercise crawling 2025-04-25 13:45:57 +02:00
b305e1ce23 Fix login using the native ilias login form 2025-04-23 16:08:45 +02:00
bdf17f5c87 Ignore wikis 2025-04-23 16:03:37 +02:00
77fce7daf8 Bump version to 3.8.1 2025-04-17 11:22:35 +02:00
653bf139f0 Fix encoding of descriptions and force images to light mode 2025-04-16 10:52:18 +02:00
3f60638d33 Bump version to 3.8.0 2025-04-16 00:47:05 +02:00
b97b6fae6b Update minimum Python version to 3.11 2025-04-15 21:35:20 +02:00
477234ad0d Support ILIAS 9 2025-04-15 21:35:20 +02:00
63f25277b0 Fix crawling of empty forum threads 2025-03-09 23:44:25 +01:00
c8eff04ae0 Make thread titles link to original ILIAS thread 2025-02-19 16:23:20 +01:00
edc482cdf4 Internalize images in forum threads 2025-02-19 16:23:20 +01:00
72cd0f77e2 Prettify forum thread exports
Co-authored-by: Tim <me@scriptim.dev>
2025-02-19 16:23:20 +01:00
be175f9347 Download only new/updated forum threads 2025-02-19 16:16:37 +01:00
ba2833dba5 Crawl all threads in a forum
Before this patch the row count was unconditionally changed to 800. This
patch tries to detect how many rows the forum has and then fetches this
amount, if it is larger than 800.
2025-02-19 12:19:33 +01:00
2f0e792670 Increase default http timeout to 30
Otherwise larger forums will fail to download in time
2025-02-19 12:19:13 +01:00
5f88539f7e Fix page size increase for forum threads 2025-02-19 12:19:11 +01:00
bd9d7efe64 "Fix" mypy errors
Thank you mypy, very cool. These types make things *so much better*.
They don't just complicate everything and don't really help because they
can not detect that an element queried by a tag is no navigable
string...
2025-02-19 12:15:41 +01:00
16a2dd5b15 fix: totp 2025-02-19 12:15:41 +01:00
678283d341 Use Python facilities to convert paths to file:// urls 2024-11-15 00:09:11 +01:00
20 changed files with 1124 additions and 666 deletions

View File

@ -14,7 +14,7 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
os: [ubuntu-latest, windows-latest, macos-13, macos-latest] os: [ubuntu-latest, windows-latest, macos-13, macos-latest]
python: ["3.9"] python: ["3.11"]
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4

View File

@ -22,6 +22,37 @@ ambiguous situations.
## Unreleased ## Unreleased
## 3.8.2 - 2025-04-29
## Changed
- Explicitly mention that wikis are not supported at the moment and ignore them
## Fixed
- Ilias-native login
- Exercise crawling
## 3.8.1 - 2025-04-17
## Fixed
- Description html files now specify at UTF-8 encoding
- Images in descriptions now always have a white background
## 3.8.0 - 2025-04-16
### Added
- Support for ILIAS 9
### Changed
- Added prettier CSS to forum threads
- Downloaded forum threads now link to the forum instead of the ILIAS thread
- Increase minimum supported Python version to 3.11
- Do not crawl nested courses (courses linked in other courses)
## Fixed
- File links in report on Windows
- TOTP authentication in KIT Shibboleth
- Forum crawling only considering the first 20 entries
## 3.7.0 - 2024-11-13 ## 3.7.0 - 2024-11-13
### Added ### Added

View File

@ -1,4 +1,4 @@
from typing import Optional, Tuple from typing import Optional, Tuple, cast
import keyring import keyring
@ -13,7 +13,7 @@ class KeyringAuthSection(AuthSection):
return self.s.get("username") return self.s.get("username")
def keyring_name(self) -> str: def keyring_name(self) -> str:
return self.s.get("keyring_name", fallback=NAME) return cast(str, self.s.get("keyring_name", fallback=NAME))
class KeyringAuthenticator(Authenticator): class KeyringAuthenticator(Authenticator):

View File

@ -149,9 +149,7 @@ class CrawlerSection(Section):
return self.s.getboolean("skip", fallback=False) return self.s.getboolean("skip", fallback=False)
def output_dir(self, name: str) -> Path: def output_dir(self, name: str) -> Path:
# TODO Use removeprefix() after switching to 3.9 name = name.removeprefix("crawl:")
if name.startswith("crawl:"):
name = name[len("crawl:"):]
return Path(self.s.get("output_dir", name)).expanduser() return Path(self.s.get("output_dir", name)).expanduser()
def redownload(self) -> Redownload: def redownload(self) -> Redownload:
@ -294,6 +292,35 @@ class Crawler(ABC):
log.explain("Answer: Yes") log.explain("Answer: Yes")
return CrawlToken(self._limiter, path) return CrawlToken(self._limiter, path)
def should_try_download(
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
) -> bool:
log.explain_topic(f"Decision: Should Download {fmt_path(path)}")
if self._transformer.transform(path) is None:
log.explain("Answer: No (ignored)")
return False
should_download = self._output_dir.should_try_download(
path,
etag_differs=etag_differs,
mtime=mtime,
redownload=redownload,
on_conflict=on_conflict
)
if should_download:
log.explain("Answer: Yes")
return True
else:
log.explain("Answer: No")
return False
async def download( async def download(
self, self,
path: PurePath, path: PurePath,

View File

@ -3,7 +3,7 @@ import http.cookies
import ssl import ssl
from datetime import datetime from datetime import datetime
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple, cast
import aiohttp import aiohttp
import certifi import certifi
@ -22,7 +22,7 @@ ETAGS_CUSTOM_REPORT_VALUE_KEY = "etags"
class HttpCrawlerSection(CrawlerSection): class HttpCrawlerSection(CrawlerSection):
def http_timeout(self) -> float: def http_timeout(self) -> float:
return self.s.getfloat("http_timeout", fallback=20) return self.s.getfloat("http_timeout", fallback=30)
class HttpCrawler(Crawler): class HttpCrawler(Crawler):
@ -187,12 +187,12 @@ class HttpCrawler(Crawler):
if level == 0 or (level == 1 and drop_h1): if level == 0 or (level == 1 and drop_h1):
return PurePath() return PurePath()
level_heading = tag.find_previous(name=f"h{level}") level_heading = cast(Optional[Tag], tag.find_previous(name=f"h{level}"))
if level_heading is None: if level_heading is None:
return find_associated_headings(tag, level - 1) return find_associated_headings(tag, level - 1)
folder_name = level_heading.getText().strip() folder_name = level_heading.get_text().strip()
return find_associated_headings(level_heading, level - 1) / folder_name return find_associated_headings(level_heading, level - 1) / folder_name
# start at level <h3> because paragraph-level headings are usually too granular for folder names # start at level <h3> because paragraph-level headings are usually too granular for folder names
@ -231,6 +231,7 @@ class HttpCrawler(Crawler):
etag_header = resp.headers.get("ETag") etag_header = resp.headers.get("ETag")
last_modified_header = resp.headers.get("Last-Modified") last_modified_header = resp.headers.get("Last-Modified")
last_modified = None
if last_modified_header: if last_modified_header:
try: try:

View File

@ -1,5 +1,5 @@
from enum import Enum from enum import Enum
from typing import Optional from typing import Optional, cast
import bs4 import bs4
@ -126,6 +126,88 @@ _learning_module_template = """
</html> </html>
""" """
_forum_thread_template = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>ILIAS - Forum: {{name}}</title>
<style>
* {
box-sizing: border-box;
}
body {
font-family: 'Open Sans', Verdana, Arial, Helvetica, sans-serif;
padding: 8px;
}
ul, ol, p {
margin: 1.2em 0;
}
p {
margin-top: 8px;
margin-bottom: 8px;
}
a {
color: #00876c;
text-decoration: none;
cursor: pointer;
}
a:hover {
text-decoration: underline;
}
body > p:first-child > span:first-child {
font-size: 1.6em;
}
body > p:first-child > span:first-child ~ span.default {
display: inline-block;
font-size: 1.2em;
padding-bottom: 8px;
}
.ilFrmPostContent {
margin-top: 8px;
max-width: 64em;
}
.ilFrmPostContent > *:first-child {
margin-top: 0px;
}
.ilFrmPostTitle {
margin-top: 24px;
color: #00876c;
font-weight: bold;
}
#ilFrmPostList {
list-style: none;
padding-left: 0;
}
li.ilFrmPostRow {
padding: 3px 0 3px 3px;
margin-bottom: 24px;
border-left: 6px solid #dddddd;
}
.ilFrmPostRow > div {
display: flex;
}
.ilFrmPostImage img {
margin: 0 !important;
padding: 6px 9px 9px 6px;
}
.ilUserIcon {
width: 115px;
}
.small {
text-decoration: none;
font-size: 0.75rem;
color: #6f6f6f;
}
</style>
</head>
<body>
{{heading}}
{{content}}
</body>
</html>
""".strip() # noqa: E501 line too long
def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str: def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str:
# Seems to be comments, ignore those. # Seems to be comments, ignore those.
@ -139,13 +221,13 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
</div> </div>
""" """
if prev and body.select_one(".ilc_page_lnav_LeftNavigation"): if prev and body.select_one(".ilc_page_lnav_LeftNavigation"):
text = body.select_one(".ilc_page_lnav_LeftNavigation").getText().strip() text = cast(bs4.Tag, body.select_one(".ilc_page_lnav_LeftNavigation")).get_text().strip()
left = f'<a href="{prev}">{text}</a>' left = f'<a href="{prev}">{text}</a>'
else: else:
left = "<span></span>" left = "<span></span>"
if next and body.select_one(".ilc_page_rnav_RightNavigation"): if next and body.select_one(".ilc_page_rnav_RightNavigation"):
text = body.select_one(".ilc_page_rnav_RightNavigation").getText().strip() text = cast(bs4.Tag, body.select_one(".ilc_page_rnav_RightNavigation")).get_text().strip()
right = f'<a href="{next}">{text}</a>' right = f'<a href="{next}">{text}</a>'
else: else:
right = "<span></span>" right = "<span></span>"
@ -160,8 +242,17 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
"{{left}}", left).replace("{{right}}", right).encode()) "{{left}}", left).replace("{{right}}", right).encode())
) )
body = body.prettify() body_str = cast(str, body.prettify())
return _learning_module_template.replace("{{body}}", body).replace("{{name}}", name) return _learning_module_template.replace("{{body}}", body_str).replace("{{name}}", name)
def forum_thread_template(name: str, url: str, heading: bs4.Tag, content: bs4.Tag) -> str:
if title := cast(Optional[bs4.Tag], heading.find(name="b")):
title.wrap(bs4.Tag(name="a", attrs={"href": url}))
return _forum_thread_template \
.replace("{{name}}", name) \
.replace("{{heading}}", cast(str, heading.prettify())) \
.replace("{{content}}", cast(str, content.prettify()))
class Links(Enum): class Links(Enum):

View File

@ -1,3 +1,5 @@
from typing import cast
from bs4 import BeautifulSoup, Comment, Tag from bs4 import BeautifulSoup, Comment, Tag
_STYLE_TAG_CONTENT = """ _STYLE_TAG_CONTENT = """
@ -37,6 +39,10 @@ _STYLE_TAG_CONTENT = """
margin: 0.5rem 0; margin: 0.5rem 0;
} }
img {
background-color: white;
}
body { body {
padding: 1em; padding: 1em;
grid-template-columns: 1fr min(60rem, 90%) 1fr; grid-template-columns: 1fr min(60rem, 90%) 1fr;
@ -54,12 +60,11 @@ _ARTICLE_WORTHY_CLASSES = [
def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup: def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup:
head = soup.new_tag("head") head = soup.new_tag("head")
soup.insert(0, head) soup.insert(0, head)
# Force UTF-8 encoding
head.append(soup.new_tag("meta", charset="utf-8"))
simplecss_link: Tag = soup.new_tag("link")
# <link rel="stylesheet" href="https://cdn.simplecss.org/simple.css"> # <link rel="stylesheet" href="https://cdn.simplecss.org/simple.css">
simplecss_link["rel"] = "stylesheet" head.append(soup.new_tag("link", rel="stylesheet", href="https://cdn.simplecss.org/simple.css"))
simplecss_link["href"] = "https://cdn.simplecss.org/simple.css"
head.append(simplecss_link)
# Basic style tags for compat # Basic style tags for compat
style: Tag = soup.new_tag("style") style: Tag = soup.new_tag("style")
@ -70,18 +75,18 @@ def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup:
def clean(soup: BeautifulSoup) -> BeautifulSoup: def clean(soup: BeautifulSoup) -> BeautifulSoup:
for block in soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES): for block in cast(list[Tag], soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES)):
block.name = "article" block.name = "article"
for block in soup.find_all("h3"): for block in cast(list[Tag], soup.find_all("h3")):
block.name = "div" block.name = "div"
for block in soup.find_all("h1"): for block in cast(list[Tag], soup.find_all("h1")):
block.name = "h3" block.name = "h3"
for block in soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap"): for block in cast(list[Tag], soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap")):
block.name = "h3" block.name = "h3"
block["class"] += ["accordion-head"] block["class"] += ["accordion-head"] # type: ignore
for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"): for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"):
children = list(dummy.children) children = list(dummy.children)
@ -97,7 +102,7 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup:
if figure := video.find_parent("figure"): if figure := video.find_parent("figure"):
figure.decompose() figure.decompose()
for hrule_imposter in soup.find_all(class_="ilc_section_Separator"): for hrule_imposter in cast(list[Tag], soup.find_all(class_="ilc_section_Separator")):
hrule_imposter.insert(0, soup.new_tag("hr")) hrule_imposter.insert(0, soup.new_tag("hr"))
return soup return soup

View File

@ -19,10 +19,10 @@ from ...utils import fmt_path, soupify, url_set_query_param
from ..crawler import CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical from ..crawler import CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical
from ..http_crawler import HttpCrawler, HttpCrawlerSection from ..http_crawler import HttpCrawler, HttpCrawlerSection
from .async_helper import _iorepeat from .async_helper import _iorepeat
from .file_templates import Links, learning_module_template from .file_templates import Links, forum_thread_template, learning_module_template
from .ilias_html_cleaner import clean, insert_base_markup from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage, from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
IliasPageElement, _sanitize_path_name, parse_ilias_forum_export) IliasPageElement, IliasSoup, _sanitize_path_name, parse_ilias_forum_export)
from .shibboleth_login import ShibbolethLogin from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int] TargetType = Union[str, int]
@ -105,9 +105,9 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
_DIRECTORY_PAGES: Set[IliasElementType] = { _DIRECTORY_PAGES: Set[IliasElementType] = {
IliasElementType.COURSE,
IliasElementType.EXERCISE, IliasElementType.EXERCISE,
IliasElementType.EXERCISE_FILES, IliasElementType.EXERCISE_FILES,
IliasElementType.EXERCISE_OVERVIEW,
IliasElementType.FOLDER, IliasElementType.FOLDER,
IliasElementType.INFO_TAB, IliasElementType.INFO_TAB,
IliasElementType.MEDIACAST_VIDEO_FOLDER, IliasElementType.MEDIACAST_VIDEO_FOLDER,
@ -217,11 +217,19 @@ instance's greatest bottleneck.
async def _crawl_desktop(self) -> None: async def _crawl_desktop(self) -> None:
await self._crawl_url( await self._crawl_url(
urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show") urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show"),
crawl_nested_courses=True
) )
async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None: async def _crawl_url(
if awaitable := await self._handle_ilias_page(url, None, PurePath("."), expected_id): self,
url: str,
expected_id: Optional[int] = None,
crawl_nested_courses: bool = False
) -> None:
if awaitable := await self._handle_ilias_page(
url, None, PurePath("."), expected_id, crawl_nested_courses
):
await awaitable await awaitable
async def _handle_ilias_page( async def _handle_ilias_page(
@ -230,6 +238,7 @@ instance's greatest bottleneck.
current_element: Optional[IliasPageElement], current_element: Optional[IliasPageElement],
path: PurePath, path: PurePath,
expected_course_id: Optional[int] = None, expected_course_id: Optional[int] = None,
crawl_nested_courses: bool = False
) -> Optional[Coroutine[Any, Any, None]]: ) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(path) maybe_cl = await self.crawl(path)
if not maybe_cl: if not maybe_cl:
@ -237,7 +246,9 @@ instance's greatest bottleneck.
if current_element: if current_element:
self._ensure_not_seen(current_element, path) self._ensure_not_seen(current_element, path)
return self._crawl_ilias_page(url, current_element, maybe_cl, expected_course_id) return self._crawl_ilias_page(
url, current_element, maybe_cl, expected_course_id, crawl_nested_courses
)
@anoncritical @anoncritical
async def _crawl_ilias_page( async def _crawl_ilias_page(
@ -246,6 +257,7 @@ instance's greatest bottleneck.
current_element: Optional[IliasPageElement], current_element: Optional[IliasPageElement],
cl: CrawlToken, cl: CrawlToken,
expected_course_id: Optional[int] = None, expected_course_id: Optional[int] = None,
crawl_nested_courses: bool = False,
) -> None: ) -> None:
elements: List[IliasPageElement] = [] elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes # A list as variable redefinitions are not propagated to outer scopes
@ -257,6 +269,7 @@ instance's greatest bottleneck.
async with cl: async with cl:
next_stage_url: Optional[str] = url next_stage_url: Optional[str] = url
current_parent = current_element current_parent = current_element
page = None
while next_stage_url: while next_stage_url:
soup = await self._get_page(next_stage_url) soup = await self._get_page(next_stage_url)
@ -266,18 +279,19 @@ instance's greatest bottleneck.
# If we expect to find a root course, enforce it # If we expect to find a root course, enforce it
if current_parent is None and expected_course_id is not None: if current_parent is None and expected_course_id is not None:
perma_link = IliasPage.get_soup_permalink(soup) perma_link = IliasPage.get_soup_permalink(soup)
if not perma_link or "crs_" not in perma_link: if not perma_link or "crs/" not in perma_link:
raise CrawlError("Invalid course id? Didn't find anything looking like a course") raise CrawlError("Invalid course id? Didn't find anything looking like a course")
if str(expected_course_id) not in perma_link: if str(expected_course_id) not in perma_link:
raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}") raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}")
page = IliasPage(soup, next_stage_url, current_parent) page = IliasPage(soup, current_parent)
if next_element := page.get_next_stage_element(): if next_element := page.get_next_stage_element():
current_parent = next_element current_parent = next_element
next_stage_url = next_element.url next_stage_url = next_element.url
else: else:
next_stage_url = None next_stage_url = None
page = cast(IliasPage, page)
elements.extend(page.get_child_elements()) elements.extend(page.get_child_elements())
if description_string := page.get_description(): if description_string := page.get_description():
description.append(description_string) description.append(description_string)
@ -292,7 +306,7 @@ instance's greatest bottleneck.
tasks: List[Awaitable[None]] = [] tasks: List[Awaitable[None]] = []
for element in elements: for element in elements:
if handle := await self._handle_ilias_element(cl.path, element): if handle := await self._handle_ilias_element(cl.path, element, crawl_nested_courses):
tasks.append(asyncio.create_task(handle)) tasks.append(asyncio.create_task(handle))
# And execute them # And execute them
@ -308,6 +322,7 @@ instance's greatest bottleneck.
self, self,
parent_path: PurePath, parent_path: PurePath,
element: IliasPageElement, element: IliasPageElement,
crawl_nested_courses: bool = False
) -> Optional[Coroutine[Any, Any, None]]: ) -> Optional[Coroutine[Any, Any, None]]:
# element.name might contain `/` if the crawler created nested elements, # element.name might contain `/` if the crawler created nested elements,
# so we can not sanitize it here. We trust in the output dir to thwart worst-case # so we can not sanitize it here. We trust in the output dir to thwart worst-case
@ -360,6 +375,64 @@ instance's greatest bottleneck.
"[bright_black](scorm learning modules are not supported)" "[bright_black](scorm learning modules are not supported)"
) )
return None return None
elif element.type == IliasElementType.LITERATURE_LIST:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](literature lists are not currently supported)"
)
return None
elif element.type == IliasElementType.LEARNING_MODULE_HTML:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](HTML learning modules are not supported)"
)
return None
elif element.type == IliasElementType.BLOG:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](blogs are not currently supported)"
)
return None
elif element.type == IliasElementType.DCL_RECORD_LIST:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](dcl record lists are not currently supported)"
)
return None
elif element.type == IliasElementType.MEDIA_POOL:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](media pools are not currently supported)"
)
return None
elif element.type == IliasElementType.COURSE:
if crawl_nested_courses:
return await self._handle_ilias_page(element.url, element, element_path)
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](not descending into linked course)"
)
return None
elif element.type == IliasElementType.WIKI:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](wikis are not currently supported)"
)
return None
elif element.type == IliasElementType.LEARNING_MODULE: elif element.type == IliasElementType.LEARNING_MODULE:
return await self._handle_learning_module(element, element_path) return await self._handle_learning_module(element, element_path)
elif element.type == IliasElementType.LINK: elif element.type == IliasElementType.LINK:
@ -461,10 +534,10 @@ instance's greatest bottleneck.
if not dl: if not dl:
return return
async with dl as (bar, sink): async with dl as (_bar, sink):
description = clean(insert_base_markup(description)) description = clean(insert_base_markup(description))
description = await self.internalize_images(description) description_tag = await self.internalize_images(description)
sink.file.write(description.prettify().encode("utf-8")) sink.file.write(cast(str, description_tag.prettify()).encode("utf-8"))
sink.done() sink.done()
@anoncritical @anoncritical
@ -483,7 +556,7 @@ instance's greatest bottleneck.
async with self.session.get(export_url, allow_redirects=False) as resp: async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated # No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers: if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()).select_one("a").get("href").strip() return soupify(await resp.read()).select_one("a").get("href").strip() # type: ignore
# We are either unauthenticated or the link is not active # We are either unauthenticated or the link is not active
new_url = resp.headers[hdrs.LOCATION].lower() new_url = resp.headers[hdrs.LOCATION].lower()
if "baseclass=illinkresourcehandlergui" in new_url and "cmd=infoscreen" in new_url: if "baseclass=illinkresourcehandlergui" in new_url and "cmd=infoscreen" in new_url:
@ -588,7 +661,7 @@ instance's greatest bottleneck.
) )
async with dl as (bar, sink): async with dl as (bar, sink):
page = IliasPage(await self._get_page(element.url), element.url, element) page = IliasPage(await self._get_page(element.url), element)
stream_elements = page.get_child_elements() stream_elements = page.get_child_elements()
if len(stream_elements) > 1: if len(stream_elements) > 1:
@ -598,7 +671,7 @@ instance's greatest bottleneck.
stream_element = stream_elements[0] stream_element = stream_elements[0]
# We do not have a local cache yet # We do not have a local cache yet
await self._stream_from_url(stream_element.url, sink, bar, is_video=True) await self._stream_from_url(stream_element, sink, bar, is_video=True)
add_to_report([str(self._transformer.transform(dl.path))]) add_to_report([str(self._transformer.transform(dl.path))])
return return
@ -613,7 +686,7 @@ instance's greatest bottleneck.
async with maybe_dl as (bar, sink): async with maybe_dl as (bar, sink):
log.explain(f"Streaming video from real url {stream_element.url}") log.explain(f"Streaming video from real url {stream_element.url}")
contained_video_paths.append(str(self._transformer.transform(maybe_dl.path))) contained_video_paths.append(str(self._transformer.transform(maybe_dl.path)))
await self._stream_from_url(stream_element.url, sink, bar, is_video=True) await self._stream_from_url(stream_element, sink, bar, is_video=True)
add_to_report(contained_video_paths) add_to_report(contained_video_paths)
@ -635,12 +708,19 @@ instance's greatest bottleneck.
async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None: async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None:
assert dl # The function is only reached when dl is not None assert dl # The function is only reached when dl is not None
async with dl as (bar, sink): async with dl as (bar, sink):
await self._stream_from_url(element.url, sink, bar, is_video) await self._stream_from_url(element, sink, bar, is_video)
async def _stream_from_url(
self,
element: IliasPageElement,
sink: FileSink,
bar: ProgressBar,
is_video: bool
) -> None:
url = element.url
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None:
async def try_stream() -> bool: async def try_stream() -> bool:
next_url = url next_url = url
# Normal files redirect to the magazine if we are not authenticated. As files could be HTML, # Normal files redirect to the magazine if we are not authenticated. As files could be HTML,
# we can not match on the content type here. Instead, we disallow redirects and inspect the # we can not match on the content type here. Instead, we disallow redirects and inspect the
# new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume # new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume
@ -688,7 +768,7 @@ instance's greatest bottleneck.
await self.authenticate(auth_id) await self.authenticate(auth_id)
if not await try_stream(): if not await try_stream():
raise CrawlError("File streaming failed after authenticate()") raise CrawlError(f"File streaming failed after authenticate() {element!r}")
async def _handle_forum( async def _handle_forum(
self, self,
@ -703,36 +783,23 @@ instance's greatest bottleneck.
@_iorepeat(3, "crawling forum") @_iorepeat(3, "crawling forum")
@anoncritical @anoncritical
async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None: async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None:
elements: List[IliasForumThread] = []
async with cl: async with cl:
next_stage_url = element.url inner = IliasPage(await self._get_page(element.url), element)
while next_stage_url: export_url = inner.get_forum_export_url()
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}") if not export_url:
log.explain(f"URL: {next_stage_url}") log.warn("Could not extract forum export url")
soup = await self._get_page(next_stage_url)
page = IliasPage(soup, next_stage_url, element)
if next := page.get_next_stage_element():
next_stage_url = next.url
else:
break
download_data = page.get_download_forum_data()
if not download_data:
raise CrawlWarning("Failed to extract forum data")
if download_data.empty:
log.explain("Forum had no threads")
return return
html = await self._post_authenticated(download_data.url, download_data.form_data)
elements = parse_ilias_forum_export(soupify(html))
elements.sort(key=lambda elem: elem.title) export = await self._post(export_url, {
"format": "html",
"cmd[createExportFile]": ""
})
elements = parse_ilias_forum_export(soupify(export))
tasks: List[Awaitable[None]] = [] tasks: List[Awaitable[None]] = []
for elem in elements: for thread in elements:
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, elem))) tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, element.url)))
# And execute them # And execute them
await self.gather(tasks) await self.gather(tasks)
@ -742,18 +809,22 @@ instance's greatest bottleneck.
async def _download_forum_thread( async def _download_forum_thread(
self, self,
parent_path: PurePath, parent_path: PurePath,
element: IliasForumThread, thread: Union[IliasForumThread, IliasPageElement],
forum_url: str
) -> None: ) -> None:
path = parent_path / (_sanitize_path_name(element.title) + ".html") path = parent_path / (_sanitize_path_name(thread.name) + ".html")
maybe_dl = await self.download(path, mtime=element.mtime) maybe_dl = await self.download(path, mtime=thread.mtime)
if not maybe_dl: if not maybe_dl or not isinstance(thread, IliasForumThread):
return return
async with maybe_dl as (bar, sink): async with maybe_dl as (bar, sink):
content = "<!DOCTYPE html>\n" rendered = forum_thread_template(
content += element.title_tag.prettify() thread.name,
content += element.content_tag.prettify() forum_url,
sink.file.write(content.encode("utf-8")) thread.name_tag,
await self.internalize_images(thread.content_tag)
)
sink.file.write(rendered.encode("utf-8"))
sink.done() sink.done()
async def _handle_learning_module( async def _handle_learning_module(
@ -777,7 +848,7 @@ instance's greatest bottleneck.
log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}") log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {element.url}") log.explain(f"URL: {element.url}")
soup = await self._get_page(element.url) soup = await self._get_page(element.url)
page = IliasPage(soup, element.url, element) page = IliasPage(soup, element)
if next := page.get_learning_module_data(): if next := page.get_learning_module_data():
elements.extend(await self._crawl_learning_module_direction( elements.extend(await self._crawl_learning_module_direction(
cl.path, next.previous_url, "left", element cl.path, next.previous_url, "left", element
@ -820,7 +891,7 @@ instance's greatest bottleneck.
log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})") log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})")
log.explain(f"URL: {next_element_url}") log.explain(f"URL: {next_element_url}")
soup = await self._get_page(next_element_url) soup = await self._get_page(next_element_url)
page = IliasPage(soup, next_element_url, parent_element) page = IliasPage(soup, parent_element)
if next := page.get_learning_module_data(): if next := page.get_learning_module_data():
elements.append(next) elements.append(next)
if dir == "left": if dir == "left":
@ -851,13 +922,13 @@ instance's greatest bottleneck.
if prev: if prev:
prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html")) prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html"))
if prev_p: if prev_p:
prev = os.path.relpath(prev_p, my_path.parent) prev = cast(str, os.path.relpath(prev_p, my_path.parent))
else: else:
prev = None prev = None
if next: if next:
next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html")) next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html"))
if next_p: if next_p:
next = os.path.relpath(next_p, my_path.parent) next = cast(str, os.path.relpath(next_p, my_path.parent))
else: else:
next = None next = None
@ -877,15 +948,15 @@ instance's greatest bottleneck.
continue continue
if elem.name == "img": if elem.name == "img":
if src := elem.attrs.get("src", None): if src := elem.attrs.get("src", None):
url = urljoin(self._base_url, src) url = urljoin(self._base_url, cast(str, src))
if not url.startswith(self._base_url): if not url.startswith(self._base_url):
continue continue
log.explain(f"Internalizing {url!r}") log.explain(f"Internalizing {url!r}")
img = await self._get_authenticated(url) img = await self._get_authenticated(url)
elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode() elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode()
if elem.name == "iframe" and elem.attrs.get("src", "").startswith("//"): if elem.name == "iframe" and cast(str, elem.attrs.get("src", "")).startswith("//"):
# For unknown reasons the protocol seems to be stripped. # For unknown reasons the protocol seems to be stripped.
elem.attrs["src"] = "https:" + elem.attrs["src"] elem.attrs["src"] = "https:" + cast(str, elem.attrs["src"])
return tag return tag
def _ensure_not_seen(self, element: IliasPageElement, parent_path: PurePath) -> None: def _ensure_not_seen(self, element: IliasPageElement, parent_path: PurePath) -> None:
@ -897,10 +968,10 @@ instance's greatest bottleneck.
) )
self._visited_urls[element.url] = parent_path self._visited_urls[element.url] = parent_path
async def _get_page(self, url: str, root_page_allowed: bool = False) -> BeautifulSoup: async def _get_page(self, url: str, root_page_allowed: bool = False) -> IliasSoup:
auth_id = await self._current_auth_id() auth_id = await self._current_auth_id()
async with self.session.get(url) as request: async with self.session.get(url) as request:
soup = soupify(await request.read()) soup = IliasSoup(soupify(await request.read()), str(request.url))
if IliasPage.is_logged_in(soup): if IliasPage.is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed) return self._verify_page(soup, url, root_page_allowed)
@ -909,13 +980,13 @@ instance's greatest bottleneck.
# Retry once after authenticating. If this fails, we will die. # Retry once after authenticating. If this fails, we will die.
async with self.session.get(url) as request: async with self.session.get(url) as request:
soup = soupify(await request.read()) soup = IliasSoup(soupify(await request.read()), str(request.url))
if IliasPage.is_logged_in(soup): if IliasPage.is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed) return self._verify_page(soup, url, root_page_allowed)
raise CrawlError(f"get_page failed even after authenticating on {url!r}") raise CrawlError(f"get_page failed even after authenticating on {url!r}")
@staticmethod @staticmethod
def _verify_page(soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup: def _verify_page(soup: IliasSoup, url: str, root_page_allowed: bool) -> IliasSoup:
if IliasPage.is_root_page(soup) and not root_page_allowed: if IliasPage.is_root_page(soup) and not root_page_allowed:
raise CrawlError( raise CrawlError(
"Unexpectedly encountered ILIAS root page. " "Unexpectedly encountered ILIAS root page. "
@ -927,29 +998,19 @@ instance's greatest bottleneck.
) )
return soup return soup
async def _post_authenticated( async def _post(
self, self,
url: str, url: str,
data: dict[str, Union[str, List[str]]] data: dict[str, Union[str, List[str]]]
) -> bytes: ) -> bytes:
auth_id = await self._current_auth_id()
form_data = aiohttp.FormData() form_data = aiohttp.FormData()
for key, val in data.items(): for key, val in data.items():
form_data.add_field(key, val) form_data.add_field(key, val)
async with self.session.post(url, data=form_data(), allow_redirects=False) as request: async with self.session.post(url, data=form_data()) as request:
if request.status == 200: if request.status == 200:
return await request.read() return await request.read()
raise CrawlError(f"post failed with status {request.status}")
# We weren't authenticated, so try to do that
await self.authenticate(auth_id)
# Retry once after authenticating. If this fails, we will die.
async with self.session.post(url, data=data, allow_redirects=False) as request:
if request.status == 200:
return await request.read()
raise CrawlError("post_authenticated failed even after authenticating")
async def _get_authenticated(self, url: str) -> bytes: async def _get_authenticated(self, url: str) -> bytes:
auth_id = await self._current_auth_id() auth_id = await self._current_auth_id()
@ -979,52 +1040,22 @@ instance's greatest bottleneck.
async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request: async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request:
login_page = soupify(await request.read()) login_page = soupify(await request.read())
login_form = login_page.find("form", attrs={"name": "formlogin"}) login_form = cast(Optional[Tag], login_page.find("form", attrs={"name": "login_form"}))
if login_form is None: if login_form is None:
raise CrawlError("Could not find the login form! Specified client id might be invalid.") raise CrawlError("Could not find the login form! Specified client id might be invalid.")
login_url = login_form.attrs.get("action") login_url = cast(Optional[str], login_form.attrs.get("action"))
if login_url is None: if login_url is None:
raise CrawlError("Could not find the action URL in the login form!") raise CrawlError("Could not find the action URL in the login form!")
username, password = await self._auth.credentials() username, password = await self._auth.credentials()
login_data = { login_form_data = aiohttp.FormData()
"username": username, login_form_data.add_field('login_form/input_3/input_4', username)
"password": password, login_form_data.add_field('login_form/input_3/input_5', password)
"cmd[doStandardAuthentication]": "Login",
}
# do the actual login # do the actual login
async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request: async with self.session.post(urljoin(self._base_url, login_url), data=login_form_data) as request:
soup = soupify(await request.read()) soup = IliasSoup(soupify(await request.read()), str(request.url))
if not self._is_logged_in(soup): if not IliasPage.is_logged_in(soup):
self._auth.invalidate_credentials() self._auth.invalidate_credentials()
@staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,8 @@
from typing import Any, Optional from typing import Any, Optional, cast
import aiohttp import aiohttp
import yarl import yarl
from bs4 import BeautifulSoup from bs4 import BeautifulSoup, Tag
from ...auth import Authenticator, TfaAuthenticator from ...auth import Authenticator, TfaAuthenticator
from ...logging import log from ...logging import log
@ -48,8 +48,8 @@ class ShibbolethLogin:
while not self._login_successful(soup): while not self._login_successful(soup):
# Searching the form here so that this fails before asking for # Searching the form here so that this fails before asking for
# credentials rather than after asking. # credentials rather than after asking.
form = soup.find("form", {"method": "post"}) form = cast(Tag, soup.find("form", {"method": "post"}))
action = form["action"] action = cast(str, form["action"])
# Equivalent: Enter credentials in # Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO # https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
@ -59,9 +59,10 @@ class ShibbolethLogin:
"_eventId_proceed": "", "_eventId_proceed": "",
"j_username": username, "j_username": username,
"j_password": password, "j_password": password,
"fudis_web_authn_assertion_input": "",
} }
if csrf_token_input := form.find("input", {"name": "csrf_token"}): if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] data["csrf_token"] = csrf_token_input["value"] # type: ignore
soup = await _post(sess, url, data) soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"): if soup.find(id="attributeRelease"):
@ -78,14 +79,14 @@ class ShibbolethLogin:
# Equivalent: Being redirected via JS automatically # Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled) # (or clicking "Continue" if you have JS disabled)
relay_state = soup.find("input", {"name": "RelayState"}) relay_state = cast(Tag, soup.find("input", {"name": "RelayState"}))
saml_response = soup.find("input", {"name": "SAMLResponse"}) saml_response = cast(Tag, soup.find("input", {"name": "SAMLResponse"}))
url = form = soup.find("form", {"method": "post"})["action"] url = form = soup.find("form", {"method": "post"})["action"] # type: ignore
data = { # using the info obtained in the while loop above data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"], "RelayState": cast(str, relay_state["value"]),
"SAMLResponse": saml_response["value"], "SAMLResponse": cast(str, saml_response["value"]),
} }
await sess.post(url, data=data) await sess.post(cast(str, url), data=data)
async def _authenticate_tfa( async def _authenticate_tfa(
self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL
@ -97,8 +98,8 @@ class ShibbolethLogin:
# Searching the form here so that this fails before asking for # Searching the form here so that this fails before asking for
# credentials rather than after asking. # credentials rather than after asking.
form = soup.find("form", {"method": "post"}) form = cast(Tag, soup.find("form", {"method": "post"}))
action = form["action"] action = cast(str, form["action"])
# Equivalent: Enter token in # Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO # https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
@ -106,10 +107,10 @@ class ShibbolethLogin:
username, password = await self._auth.credentials() username, password = await self._auth.credentials()
data = { data = {
"_eventId_proceed": "", "_eventId_proceed": "",
"j_tokenNumber": tfa_token, "fudis_otp_input": tfa_token,
} }
if csrf_token_input := form.find("input", {"name": "csrf_token"}): if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] data["csrf_token"] = csrf_token_input["value"] # type: ignore
return await _post(session, url, data) return await _post(session, url, data)
@staticmethod @staticmethod
@ -120,7 +121,7 @@ class ShibbolethLogin:
@staticmethod @staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool: def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="j_tokenNumber") is not None return soup.find(id="fudiscr-form") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup: async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:

View File

@ -3,7 +3,7 @@ import re
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from pathlib import PurePath from pathlib import PurePath
from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union, cast
from urllib.parse import urljoin from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
@ -156,11 +156,11 @@ class KitIpdCrawler(HttpCrawler):
name = os.path.basename(url) name = os.path.basename(url)
return KitIpdFile(name, url) return KitIpdFile(name, url)
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> List[Tag]: def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> list[Tag]:
return tag.findAll(name="a", attrs={"href": self._file_regex}) return cast(list[Tag], tag.find_all(name="a", attrs={"href": self._file_regex}))
def _abs_url_from_link(self, url: str, link_tag: Tag) -> str: def _abs_url_from_link(self, url: str, link_tag: Tag) -> str:
return urljoin(url, link_tag.get("href")) return urljoin(url, cast(str, link_tag.get("href")))
async def _stream_from_url(self, url: str, path: PurePath, sink: FileSink, bar: ProgressBar) -> None: async def _stream_from_url(self, url: str, path: PurePath, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp: async with self.session.get(url, allow_redirects=False) as resp:

View File

@ -1,9 +1,8 @@
import asyncio import asyncio
import sys import sys
import traceback import traceback
from contextlib import asynccontextmanager, contextmanager from contextlib import AbstractContextManager, asynccontextmanager, contextmanager
# TODO In Python 3.9 and above, ContextManager is deprecated from typing import AsyncIterator, Iterator, List, Optional
from typing import AsyncIterator, ContextManager, Iterator, List, Optional
from rich.console import Console, Group from rich.console import Console, Group
from rich.live import Live from rich.live import Live
@ -261,7 +260,7 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
action: str, action: str,
text: str, text: str,
total: Optional[float] = None, total: Optional[float] = None,
) -> ContextManager[ProgressBar]: ) -> AbstractContextManager[ProgressBar]:
""" """
Allows markup in the "style" argument which will be applied to the Allows markup in the "style" argument which will be applied to the
"action" string. "action" string.
@ -277,7 +276,7 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
action: str, action: str,
text: str, text: str,
total: Optional[float] = None, total: Optional[float] = None,
) -> ContextManager[ProgressBar]: ) -> AbstractContextManager[ProgressBar]:
""" """
Allows markup in the "style" argument which will be applied to the Allows markup in the "style" argument which will be applied to the
"action" string. "action" string.

View File

@ -371,6 +371,22 @@ class OutputDirectory:
raise OutputDirError("Failed to create temporary file") raise OutputDirError("Failed to create temporary file")
def should_try_download(
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
) -> bool:
heuristics = Heuristics(etag_differs, mtime)
redownload = self._redownload if redownload is None else redownload
on_conflict = self._on_conflict if on_conflict is None else on_conflict
local_path = self.resolve(path)
return self._should_download(local_path, heuristics, redownload, on_conflict)
async def download( async def download(
self, self,
remote_path: PurePath, remote_path: PurePath,

View File

@ -1,6 +1,5 @@
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Dict, List, Optional from typing import Dict, List, Optional
from urllib.parse import quote
from rich.markup import escape from rich.markup import escape
@ -171,9 +170,7 @@ class Pferd:
def fmt_path_link(relative_path: PurePath) -> str: def fmt_path_link(relative_path: PurePath) -> str:
# We need to URL-encode the path because it might contain spaces or special characters # We need to URL-encode the path because it might contain spaces or special characters
absolute_path = str(crawler.output_dir.resolve(relative_path).absolute()) link = crawler.output_dir.resolve(relative_path).absolute().as_uri()
absolute_path = absolute_path.replace("\\\\?\\", "")
link = f"file://{quote(absolute_path)}"
return f"[link={link}]{fmt_path(relative_path)}[/link]" return f"[link={link}]{fmt_path(relative_path)}[/link]"
something_changed = False something_changed = False

View File

@ -34,15 +34,6 @@ class MarkConflictError(Exception):
self.collides_with = collides_with self.collides_with = collides_with
# TODO Use PurePath.is_relative_to when updating to 3.9
def is_relative_to(a: PurePath, b: PurePath) -> bool:
try:
a.relative_to(b)
return True
except ValueError:
return False
class Report: class Report:
""" """
A report of a synchronization. Includes all files found by the crawler, as A report of a synchronization. Includes all files found by the crawler, as
@ -173,7 +164,7 @@ class Report:
if path == other: if path == other:
raise MarkDuplicateError(path) raise MarkDuplicateError(path)
if is_relative_to(path, other) or is_relative_to(other, path): if path.is_relative_to(other) or other.is_relative_to(path):
raise MarkConflictError(path, other) raise MarkConflictError(path, other)
self.known_files.add(path) self.known_files.add(path)

View File

@ -1,2 +1,2 @@
NAME = "PFERD" NAME = "PFERD"
VERSION = "3.7.0" VERSION = "3.8.2"

View File

@ -17,7 +17,7 @@ Binaries for Linux, Windows and Mac can be downloaded directly from the
### With pip ### With pip
Ensure you have at least Python 3.9 installed. Run the following command to Ensure you have at least Python 3.11 installed. Run the following command to
install PFERD or upgrade it to the latest version: install PFERD or upgrade it to the latest version:
``` ```

8
flake.lock generated
View File

@ -2,16 +2,16 @@
"nodes": { "nodes": {
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1708979614, "lastModified": 1744440957,
"narHash": "sha256-FWLWmYojIg6TeqxSnHkKpHu5SGnFP5um1uUjH+wRV6g=", "narHash": "sha256-FHlSkNqFmPxPJvy+6fNLaNeWnF1lZSgqVCl/eWaJRc4=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "b7ee09cf5614b02d289cd86fcfa6f24d4e078c2a", "rev": "26d499fc9f1d567283d5d56fcf367edd815dba1d",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "nixos-23.11", "ref": "nixos-24.11",
"repo": "nixpkgs", "repo": "nixpkgs",
"type": "github" "type": "github"
} }

View File

@ -2,7 +2,7 @@
description = "Tool for downloading course-related files from ILIAS"; description = "Tool for downloading course-related files from ILIAS";
inputs = { inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11"; nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.11";
}; };
outputs = { self, nixpkgs }: outputs = { self, nixpkgs }:

View File

@ -12,7 +12,7 @@ dependencies = [
"certifi>=2021.10.8" "certifi>=2021.10.8"
] ]
dynamic = ["version"] dynamic = ["version"]
requires-python = ">=3.9" requires-python = ">=3.11"
[project.scripts] [project.scripts]
pferd = "PFERD.__main__:main" pferd = "PFERD.__main__:main"