Compare commits

...

43 Commits

Author SHA1 Message Date
465f8b28c0 Bump version to 3.8.3 2025-07-01 14:28:30 +02:00
27e69af2f3 Update changelog for 8caad00 2025-07-01 14:26:10 +02:00
56e3065950 Document usage of pilot.ilias.studium.kit.edu (#111) 2025-05-30 17:13:45 +02:00
549ce6cce9 Ignore unavailable elements (#119) 2025-05-28 17:04:57 +02:00
34564cedb4 Add support for link collections 2025-05-27 16:25:59 +02:00
2b0d20a1f6 Fix crawling of exercises with instructions
We do not want a second path and the instruction field has an identical
link...
2025-05-26 14:42:38 +02:00
8caad0008d Fix check for nonexistent ilias_url command attribute to base_url (#113) 2025-05-05 22:05:54 +02:00
77a23265a9 Bump version to 3.8.2 2025-04-29 17:55:57 +02:00
4c230ef6dd Fix exercise crawling 2025-04-25 13:45:57 +02:00
b305e1ce23 Fix login using the native ilias login form 2025-04-23 16:08:45 +02:00
bdf17f5c87 Ignore wikis 2025-04-23 16:03:37 +02:00
77fce7daf8 Bump version to 3.8.1 2025-04-17 11:22:35 +02:00
653bf139f0 Fix encoding of descriptions and force images to light mode 2025-04-16 10:52:18 +02:00
3f60638d33 Bump version to 3.8.0 2025-04-16 00:47:05 +02:00
b97b6fae6b Update minimum Python version to 3.11 2025-04-15 21:35:20 +02:00
477234ad0d Support ILIAS 9 2025-04-15 21:35:20 +02:00
63f25277b0 Fix crawling of empty forum threads 2025-03-09 23:44:25 +01:00
c8eff04ae0 Make thread titles link to original ILIAS thread 2025-02-19 16:23:20 +01:00
edc482cdf4 Internalize images in forum threads 2025-02-19 16:23:20 +01:00
72cd0f77e2 Prettify forum thread exports
Co-authored-by: Tim <me@scriptim.dev>
2025-02-19 16:23:20 +01:00
be175f9347 Download only new/updated forum threads 2025-02-19 16:16:37 +01:00
ba2833dba5 Crawl all threads in a forum
Before this patch the row count was unconditionally changed to 800. This
patch tries to detect how many rows the forum has and then fetches this
amount, if it is larger than 800.
2025-02-19 12:19:33 +01:00
2f0e792670 Increase default http timeout to 30
Otherwise larger forums will fail to download in time
2025-02-19 12:19:13 +01:00
5f88539f7e Fix page size increase for forum threads 2025-02-19 12:19:11 +01:00
bd9d7efe64 "Fix" mypy errors
Thank you mypy, very cool. These types make things *so much better*.
They don't just complicate everything and don't really help because they
can not detect that an element queried by a tag is no navigable
string...
2025-02-19 12:15:41 +01:00
16a2dd5b15 fix: totp 2025-02-19 12:15:41 +01:00
678283d341 Use Python facilities to convert paths to file:// urls 2024-11-15 00:09:11 +01:00
287173b0b1 Bump version to 3.7.0 2024-11-13 20:38:27 +01:00
712217e959 Handle groups in cards 2024-11-11 12:53:08 +01:00
6dda4c55a8 Add doctype header to forum threads
This should fix mimetype detection on most systems and is more relevant
now that the report is clickable
2024-11-05 18:36:21 +01:00
596b6a7688 Add support for non-KIT shibboleth login (#98)
Co-authored-by: Mr-Pine <git@mr-pine.de>
Co-authored-by: I-Al-Istannen <I-Al-Istannen@users.noreply.github.com>
2024-11-05 18:30:34 +01:00
Tim
5983200247 Treat headings as folders in kit-ipd crawler (#99) 2024-11-04 23:53:48 +01:00
Tim
26e802d88b Add clickable links to file names in the printed report (#100)
Co-authored-by: I-Al-Istannen <i-al-istannen@users.noreply.github.com>
2024-11-04 00:32:32 +01:00
f5c4e82816 Delay ilias loop detection after transform
This allows users to filter out duplicated elements and suppress the
warning.
2024-11-02 22:46:51 +01:00
f5273f7ca0 Collapse ilias url crawling into normal page crawling 2024-11-02 22:46:51 +01:00
fa71a9f44f Add support for mob videos in page descriptions 2024-10-28 20:35:30 +01:00
81d6ff53c4 Respect row flex in descriptions 2024-10-28 19:41:03 +01:00
d7a2b6e019 Delete videos from course descriptions 2024-10-28 19:41:03 +01:00
71c65e89d1 Internalize images in course descriptions 2024-10-28 19:41:03 +01:00
c1046498e7 Fix download of links without a target URL
They are now downloaded as links to the empty url.
2024-10-28 19:41:03 +01:00
8fbd1978af Fix crawling of nested courses 2024-10-28 18:52:27 +01:00
Tim
739dd95850 Use Last-Modified and ETag headers to determine KIT-IPD file versions (#95)
Co-authored-by: I-Al-Istannen <i-al-istannen@users.noreply.github.com>
2024-10-27 19:03:47 +01:00
c54c3bcfa1 Fix crawling of favorites 2024-10-27 10:50:59 +01:00
25 changed files with 1805 additions and 1046 deletions

View File

@ -14,7 +14,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-13, macos-latest]
python: ["3.9"]
python: ["3.11"]
steps:
- uses: actions/checkout@v4

View File

@ -22,6 +22,73 @@ ambiguous situations.
## Unreleased
## 3.8.3 - 2025-07-01
## Added
- Support for link collections.
In "fancy" mode, a single HTML file with multiple links is generated.
In all other modes, PFERD creates a folder for the collection and a new file
for every link inside.
## Fixed
- Crawling of exercises with instructions
- Don't download unavailable elements.
Elements that are unavailable (for example, because their availability is
time restricted) will not download the HTML for the info page anymore.
- `base_url` argument for `ilias-web` crawler causing crashes
## 3.8.2 - 2025-04-29
## Changed
- Explicitly mention that wikis are not supported at the moment and ignore them
## Fixed
- Ilias-native login
- Exercise crawling
## 3.8.1 - 2025-04-17
## Fixed
- Description html files now specify at UTF-8 encoding
- Images in descriptions now always have a white background
## 3.8.0 - 2025-04-16
### Added
- Support for ILIAS 9
### Changed
- Added prettier CSS to forum threads
- Downloaded forum threads now link to the forum instead of the ILIAS thread
- Increase minimum supported Python version to 3.11
- Do not crawl nested courses (courses linked in other courses)
## Fixed
- File links in report on Windows
- TOTP authentication in KIT Shibboleth
- Forum crawling only considering the first 20 entries
## 3.7.0 - 2024-11-13
### Added
- Support for MOB videos in page descriptions
- Clickable links in the report to directly open new/modified/not-deleted files
- Support for non KIT shibboleth login
### Changed
- Remove videos from description pages
- Perform ILIAS cycle detection after processing the transform to allow
ignoring duplicated elements
- Parse headings (h1-h3) as folders in kit-ipd crawler
### Fixed
- Personal desktop/dashboard/favorites crawling
- Crawling of nested courses
- Downloading of links with no target URL
- Handle row flex on description pages
- Add `<!DOCTYPE html>` heading to forum threads to fix mime type detection
- Handle groups in cards
## 3.6.0 - 2024-10-23
### Added

View File

@ -163,12 +163,14 @@ out of the box for the corresponding universities:
[ilias-dl]: https://github.com/V3lop5/ilias-downloader/blob/main/configs "ilias-downloader configs"
| University | `base_url` | `client_id` |
|---------------|--------------------------------------|---------------|
| FH Aachen | https://www.ili.fh-aachen.de | elearning |
| Uni Köln | https://www.ilias.uni-koeln.de/ilias | uk |
| Uni Konstanz | https://ilias.uni-konstanz.de | ILIASKONSTANZ |
| Uni Stuttgart | https://ilias3.uni-stuttgart.de | Uni_Stuttgart |
| University | `base_url` | `login_type` | `client_id` |
|-----------------|-----------------------------------------|--------------|---------------|
| FH Aachen | https://www.ili.fh-aachen.de | local | elearning |
| Uni Köln | https://www.ilias.uni-koeln.de/ilias | local | uk |
| Uni Konstanz | https://ilias.uni-konstanz.de | local | ILIASKONSTANZ |
| Uni Stuttgart | https://ilias3.uni-stuttgart.de | local | Uni_Stuttgart |
| Uni Tübingen | https://ovidius.uni-tuebingen.de/ilias3 | shibboleth | |
| KIT ILIAS Pilot | https://pilot.ilias.studium.kit.edu | shibboleth | pilot |
If your university isn't listed, try navigating to your instance's login page.
Assuming no custom login service is used, the URL will look something like this:
@ -180,7 +182,11 @@ Assuming no custom login service is used, the URL will look something like this:
If the values work, feel free to submit a PR and add them to the table above.
- `base_url`: The URL where the ILIAS instance is located. (Required)
- `client_id`: An ID used for authentication. (Required)
- `login_type`: How you authenticate. (Required)
- `local`: Use `client_id` for authentication.
- `shibboleth`: Use shibboleth for authentication.
- `client_id`: An ID used for authentication if `login_type` is `local`. Is
ignored if `login_type` is `shibboleth`.
- `target`: The ILIAS element to crawl. (Required)
- `desktop`: Crawl your personal desktop / dashboard
- `<course id>`: Crawl the course with the given id
@ -191,6 +197,8 @@ If the values work, feel free to submit a PR and add them to the table above.
and duplication warnings if you are a member of an ILIAS group. The
`desktop` target is generally preferable.
- `auth`: Name of auth section to use for login. (Required)
- `tfa_auth`: Name of auth section to use for two-factor authentication. Only
uses the auth section's password. (Default: Anonymous `tfa` authenticator)
- `links`: How to represent external links. (Default: `fancy`)
- `ignore`: Don't download links.
- `plaintext`: A text file containing only the URL.

View File

@ -1,6 +1,6 @@
Copyright 2019-2024 Garmelon, I-Al-Istannen, danstooamerican, pavelzw,
TheChristophe, Scriptim, thelukasprobst, Toorero,
Mr-Pine, p-fruck
Mr-Pine, p-fruck, PinieP
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in

View File

@ -1,4 +1,4 @@
from typing import Optional, Tuple
from typing import Optional, Tuple, cast
import keyring
@ -13,7 +13,7 @@ class KeyringAuthSection(AuthSection):
return self.s.get("username")
def keyring_name(self) -> str:
return self.s.get("keyring_name", fallback=NAME)
return cast(str, self.s.get("keyring_name", fallback=NAME))
class KeyringAuthenticator(Authenticator):

View File

@ -45,8 +45,8 @@ def load(
load_crawler(args, section)
section["type"] = COMMAND_NAME
if args.ilias_url is not None:
section["base_url"] = args.ilias_url
if args.base_url is not None:
section["base_url"] = args.base_url
if args.client_id is not None:
section["client_id"] = args.client_id

View File

@ -149,9 +149,7 @@ class CrawlerSection(Section):
return self.s.getboolean("skip", fallback=False)
def output_dir(self, name: str) -> Path:
# TODO Use removeprefix() after switching to 3.9
if name.startswith("crawl:"):
name = name[len("crawl:"):]
name = name.removeprefix("crawl:")
return Path(self.s.get("output_dir", name)).expanduser()
def redownload(self) -> Redownload:
@ -258,6 +256,10 @@ class Crawler(ABC):
def prev_report(self) -> Optional[Report]:
return self._output_dir.prev_report
@property
def output_dir(self) -> OutputDirectory:
return self._output_dir
@staticmethod
async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]:
"""
@ -290,9 +292,40 @@ class Crawler(ABC):
log.explain("Answer: Yes")
return CrawlToken(self._limiter, path)
def should_try_download(
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
) -> bool:
log.explain_topic(f"Decision: Should Download {fmt_path(path)}")
if self._transformer.transform(path) is None:
log.explain("Answer: No (ignored)")
return False
should_download = self._output_dir.should_try_download(
path,
etag_differs=etag_differs,
mtime=mtime,
redownload=redownload,
on_conflict=on_conflict
)
if should_download:
log.explain("Answer: Yes")
return True
else:
log.explain("Answer: No")
return False
async def download(
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
@ -307,7 +340,14 @@ class Crawler(ABC):
log.status("[bold bright_black]", "Ignored", fmt_path(path))
return None
fs_token = await self._output_dir.download(path, transformed_path, mtime, redownload, on_conflict)
fs_token = await self._output_dir.download(
path,
transformed_path,
etag_differs=etag_differs,
mtime=mtime,
redownload=redownload,
on_conflict=on_conflict
)
if fs_token is None:
log.explain("Answer: No")
return None

View File

@ -1,12 +1,14 @@
import asyncio
import http.cookies
import ssl
from datetime import datetime
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple, cast
import aiohttp
import certifi
from aiohttp.client import ClientTimeout
from bs4 import Tag
from ..auth import Authenticator
from ..config import Config
@ -15,10 +17,12 @@ from ..utils import fmt_real_path
from ..version import NAME, VERSION
from .crawler import Crawler, CrawlerSection
ETAGS_CUSTOM_REPORT_VALUE_KEY = "etags"
class HttpCrawlerSection(CrawlerSection):
def http_timeout(self) -> float:
return self.s.getfloat("http_timeout", fallback=20)
return self.s.getfloat("http_timeout", fallback=30)
class HttpCrawler(Crawler):
@ -169,6 +173,79 @@ class HttpCrawler(Crawler):
log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}")
log.warn(str(e))
@staticmethod
def get_folder_structure_from_heading_hierarchy(file_link: Tag, drop_h1: bool = False) -> PurePath:
"""
Retrieves the hierarchy of headings associated with the give file link and constructs a folder
structure from them.
<h1> level headings usually only appear once and serve as the page title, so they would introduce
redundant nesting. To avoid this, <h1> headings are ignored via the drop_h1 parameter.
"""
def find_associated_headings(tag: Tag, level: int) -> PurePath:
if level == 0 or (level == 1 and drop_h1):
return PurePath()
level_heading = cast(Optional[Tag], tag.find_previous(name=f"h{level}"))
if level_heading is None:
return find_associated_headings(tag, level - 1)
folder_name = level_heading.get_text().strip()
return find_associated_headings(level_heading, level - 1) / folder_name
# start at level <h3> because paragraph-level headings are usually too granular for folder names
return find_associated_headings(file_link, 3)
def _get_previous_etag_from_report(self, path: PurePath) -> Optional[str]:
"""
If available, retrieves the entity tag for a given path which was stored in the previous report.
"""
if not self._output_dir.prev_report:
return None
etags = self._output_dir.prev_report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {}
return etags.get(str(path))
def _add_etag_to_report(self, path: PurePath, etag: Optional[str]) -> None:
"""
Adds an entity tag for a given path to the report's custom values.
"""
if not etag:
return
etags = self._output_dir.report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {}
etags[str(path)] = etag
self._output_dir.report.add_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY, etags)
async def _request_resource_version(self, resource_url: str) -> Tuple[Optional[str], Optional[datetime]]:
"""
Requests the ETag and Last-Modified headers of a resource via a HEAD request.
If no entity tag / modification date can be obtained, the according value will be None.
"""
try:
async with self.session.head(resource_url) as resp:
if resp.status != 200:
return None, None
etag_header = resp.headers.get("ETag")
last_modified_header = resp.headers.get("Last-Modified")
last_modified = None
if last_modified_header:
try:
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified#directives
datetime_format = "%a, %d %b %Y %H:%M:%S GMT"
last_modified = datetime.strptime(last_modified_header, datetime_format)
except ValueError:
# last_modified remains None
pass
return etag_header, last_modified
except aiohttp.ClientError:
return None, None
async def run(self) -> None:
self._request_count = 0
self._cookie_jar = aiohttp.CookieJar()
@ -186,7 +263,12 @@ class HttpCrawler(Crawler):
connect=self._http_timeout,
sock_connect=self._http_timeout,
sock_read=self._http_timeout,
)
),
# See https://github.com/aio-libs/aiohttp/issues/6626
# Without this aiohttp will mangle the redirect header from Shibboleth, invalidating the
# passed signature. Shibboleth will not accept the broken signature and authentication will
# fail.
requote_redirect_url=False
) as session:
self.session = session
try:

View File

@ -25,9 +25,10 @@ def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Calla
except asyncio.exceptions.TimeoutError as e: # explicit http timeouts in HttpCrawler
last_exception = e
log.explain_topic(f"Retrying operation {name}. Retries left: {attempts - 1 - round}")
log.explain(f"Last exception: {last_exception!r}")
if last_exception:
message = f"Error in I/O Operation: {last_exception}"
message = f"Error in I/O Operation: {last_exception!r}"
if failure_is_error:
raise CrawlError(message) from last_exception
else:

View File

@ -1,5 +1,7 @@
import dataclasses
import re
from enum import Enum
from typing import Optional
from typing import Optional, cast
import bs4
@ -12,7 +14,9 @@ _link_template_fancy = """
<head>
<meta charset="UTF-8">
<title>ILIAS - Link: {{name}}</title>
<!-- REPEAT REMOVE START -->
<meta http-equiv = "refresh" content = "{{redirect_delay}}; url = {{link}}" />
<!-- REPEAT REMOVE END -->
</head>
<style>
@ -23,6 +27,8 @@ _link_template_fancy = """
display: flex;
align-items: center;
justify-content: center;
flex-direction: column;
gap: 4px;
}
body {
padding: 0;
@ -31,11 +37,16 @@ _link_template_fancy = """
font-family: "Open Sans", Verdana, Arial, Helvetica, sans-serif;
height: 100vh;
}
.row {
background-color: white;
.column {
min-width: 500px;
max-width: 90vw;
display: flex;
flex-direction: column;
row-gap: 5px;
}
.row {
background-color: white;
display: flex;
padding: 1em;
}
.logo {
@ -75,6 +86,8 @@ _link_template_fancy = """
}
</style>
<body class="center-flex">
<div class="column">
<!-- REPEAT START -->
<div class="row">
<div class="logo center-flex">
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24">
@ -89,6 +102,8 @@ _link_template_fancy = """
</div>
<div class="menu-button center-flex"> ⯆ </div>
</div>
<!-- REPEAT END -->
</div>
</body>
</html>
""".strip() # noqa: E501 line too long
@ -126,6 +141,88 @@ _learning_module_template = """
</html>
"""
_forum_thread_template = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>ILIAS - Forum: {{name}}</title>
<style>
* {
box-sizing: border-box;
}
body {
font-family: 'Open Sans', Verdana, Arial, Helvetica, sans-serif;
padding: 8px;
}
ul, ol, p {
margin: 1.2em 0;
}
p {
margin-top: 8px;
margin-bottom: 8px;
}
a {
color: #00876c;
text-decoration: none;
cursor: pointer;
}
a:hover {
text-decoration: underline;
}
body > p:first-child > span:first-child {
font-size: 1.6em;
}
body > p:first-child > span:first-child ~ span.default {
display: inline-block;
font-size: 1.2em;
padding-bottom: 8px;
}
.ilFrmPostContent {
margin-top: 8px;
max-width: 64em;
}
.ilFrmPostContent > *:first-child {
margin-top: 0px;
}
.ilFrmPostTitle {
margin-top: 24px;
color: #00876c;
font-weight: bold;
}
#ilFrmPostList {
list-style: none;
padding-left: 0;
}
li.ilFrmPostRow {
padding: 3px 0 3px 3px;
margin-bottom: 24px;
border-left: 6px solid #dddddd;
}
.ilFrmPostRow > div {
display: flex;
}
.ilFrmPostImage img {
margin: 0 !important;
padding: 6px 9px 9px 6px;
}
.ilUserIcon {
width: 115px;
}
.small {
text-decoration: none;
font-size: 0.75rem;
color: #6f6f6f;
}
</style>
</head>
<body>
{{heading}}
{{content}}
</body>
</html>
""".strip() # noqa: E501 line too long
def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str:
# Seems to be comments, ignore those.
@ -139,13 +236,13 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
</div>
"""
if prev and body.select_one(".ilc_page_lnav_LeftNavigation"):
text = body.select_one(".ilc_page_lnav_LeftNavigation").getText().strip()
text = cast(bs4.Tag, body.select_one(".ilc_page_lnav_LeftNavigation")).get_text().strip()
left = f'<a href="{prev}">{text}</a>'
else:
left = "<span></span>"
if next and body.select_one(".ilc_page_rnav_RightNavigation"):
text = body.select_one(".ilc_page_rnav_RightNavigation").getText().strip()
text = cast(bs4.Tag, body.select_one(".ilc_page_rnav_RightNavigation")).get_text().strip()
right = f'<a href="{next}">{text}</a>'
else:
right = "<span></span>"
@ -160,8 +257,24 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
"{{left}}", left).replace("{{right}}", right).encode())
)
body = body.prettify()
return _learning_module_template.replace("{{body}}", body).replace("{{name}}", name)
body_str = cast(str, body.prettify())
return _learning_module_template.replace("{{body}}", body_str).replace("{{name}}", name)
def forum_thread_template(name: str, url: str, heading: bs4.Tag, content: bs4.Tag) -> str:
if title := cast(Optional[bs4.Tag], heading.find(name="b")):
title.wrap(bs4.Tag(name="a", attrs={"href": url}))
return _forum_thread_template \
.replace("{{name}}", name) \
.replace("{{heading}}", cast(str, heading.prettify())) \
.replace("{{content}}", cast(str, content.prettify()))
@dataclasses.dataclass
class LinkData:
name: str
url: str
description: str
class Links(Enum):
@ -181,6 +294,11 @@ class Links(Enum):
return None
raise ValueError("Missing switch case")
def collection_as_one(self) -> bool:
if self == Links.FANCY:
return True
return False
def extension(self) -> Optional[str]:
if self == Links.FANCY:
return ".html"
@ -192,10 +310,48 @@ class Links(Enum):
return None
raise ValueError("Missing switch case")
def interpolate(self, redirect_delay: int, collection_name: str, links: list[LinkData]) -> str:
template = self.template()
if template is None:
raise ValueError("Cannot interpolate ignored links")
if len(links) == 1:
link = links[0]
content = template
content = content.replace("{{link}}", link.url)
content = content.replace("{{name}}", link.name)
content = content.replace("{{description}}", link.description)
content = content.replace("{{redirect_delay}}", str(redirect_delay))
return content
if self == Links.PLAINTEXT or self == Links.INTERNET_SHORTCUT:
return "\n".join(f"{link.url}" for link in links)
# All others get coerced to fancy
content = cast(str, Links.FANCY.template())
repeated_content = cast(
re.Match[str],
re.search(r"<!-- REPEAT START -->([\s\S]+)<!-- REPEAT END -->", content)
).group(1)
parts = []
for link in links:
instance = repeated_content
instance = instance.replace("{{link}}", link.url)
instance = instance.replace("{{name}}", link.name)
instance = instance.replace("{{description}}", link.description)
instance = instance.replace("{{redirect_delay}}", str(redirect_delay))
parts.append(instance)
content = content.replace(repeated_content, "\n".join(parts))
content = content.replace("{{name}}", collection_name)
content = re.sub(r"<!-- REPEAT REMOVE START -->[\s\S]+<!-- REPEAT REMOVE END -->", "", content)
return content
@staticmethod
def from_string(string: str) -> "Links":
try:
return Links(string)
except ValueError:
raise ValueError("must be one of 'ignore', 'plaintext',"
" 'html', 'internet-shortcut'")
options = [f"'{option.value}'" for option in Links]
raise ValueError(f"must be one of {', '.join(options)}")

View File

@ -1,3 +1,5 @@
from typing import cast
from bs4 import BeautifulSoup, Comment, Tag
_STYLE_TAG_CONTENT = """
@ -12,6 +14,13 @@ _STYLE_TAG_CONTENT = """
font-weight: bold;
}
.row-flex {
display: flex;
}
.row-flex-wrap {
flex-wrap: wrap;
}
.accordion-head {
background-color: #f5f7fa;
padding: 0.5rem 0;
@ -30,6 +39,10 @@ _STYLE_TAG_CONTENT = """
margin: 0.5rem 0;
}
img {
background-color: white;
}
body {
padding: 1em;
grid-template-columns: 1fr min(60rem, 90%) 1fr;
@ -47,12 +60,11 @@ _ARTICLE_WORTHY_CLASSES = [
def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup:
head = soup.new_tag("head")
soup.insert(0, head)
# Force UTF-8 encoding
head.append(soup.new_tag("meta", charset="utf-8"))
simplecss_link: Tag = soup.new_tag("link")
# <link rel="stylesheet" href="https://cdn.simplecss.org/simple.css">
simplecss_link["rel"] = "stylesheet"
simplecss_link["href"] = "https://cdn.simplecss.org/simple.css"
head.append(simplecss_link)
head.append(soup.new_tag("link", rel="stylesheet", href="https://cdn.simplecss.org/simple.css"))
# Basic style tags for compat
style: Tag = soup.new_tag("style")
@ -63,18 +75,18 @@ def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup:
def clean(soup: BeautifulSoup) -> BeautifulSoup:
for block in soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES):
for block in cast(list[Tag], soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES)):
block.name = "article"
for block in soup.find_all("h3"):
for block in cast(list[Tag], soup.find_all("h3")):
block.name = "div"
for block in soup.find_all("h1"):
for block in cast(list[Tag], soup.find_all("h1")):
block.name = "h3"
for block in soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap"):
for block in cast(list[Tag], soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap")):
block.name = "h3"
block["class"] += ["accordion-head"]
block["class"] += ["accordion-head"] # type: ignore
for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"):
children = list(dummy.children)
@ -85,7 +97,12 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup:
if isinstance(type(children[0]), Comment):
dummy.decompose()
for hrule_imposter in soup.find_all(class_="ilc_section_Separator"):
# Delete video figures, as they can not be internalized anyway
for video in soup.select(".ilc_media_cont_MediaContainerHighlighted .ilPageVideo"):
if figure := video.find_parent("figure"):
figure.decompose()
for hrule_imposter in cast(list[Tag], soup.find_all(class_="ilc_section_Separator")):
hrule_imposter.insert(0, soup.new_tag("hr"))
return soup

View File

@ -19,14 +19,20 @@ from ...utils import fmt_path, soupify, url_set_query_param
from ..crawler import CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical
from ..http_crawler import HttpCrawler, HttpCrawlerSection
from .async_helper import _iorepeat
from .file_templates import Links, learning_module_template
from .file_templates import LinkData, Links, forum_thread_template, learning_module_template
from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
IliasPageElement, _sanitize_path_name, parse_ilias_forum_export)
IliasPageElement, IliasSoup, _sanitize_path_name, parse_ilias_forum_export)
from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int]
class LoginTypeLocal:
def __init__(self, client_id: str):
self.client_id = client_id
class IliasWebCrawlerSection(HttpCrawlerSection):
def base_url(self) -> str:
base_url = self.s.get("base_url")
@ -35,12 +41,30 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
return base_url
def client_id(self) -> str:
def login(self) -> Union[Literal["shibboleth"], LoginTypeLocal]:
login_type = self.s.get("login_type")
if not login_type:
self.missing_value("login_type")
if login_type == "shibboleth":
return "shibboleth"
if login_type == "local":
client_id = self.s.get("client_id")
if not client_id:
self.missing_value("client_id")
return LoginTypeLocal(client_id)
return client_id
self.invalid_value("login_type", login_type, "Should be <shibboleth | local>")
def tfa_auth(
self, authenticators: Dict[str, Authenticator]
) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
auth = authenticators.get(value)
if auth is None:
self.invalid_value("tfa_auth", value, "No such auth section exists")
return auth
def target(self) -> TargetType:
target = self.s.get("target")
@ -83,21 +107,22 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
_DIRECTORY_PAGES: Set[IliasElementType] = {
IliasElementType.EXERCISE,
IliasElementType.EXERCISE_FILES,
IliasElementType.EXERCISE_OVERVIEW,
IliasElementType.FOLDER,
IliasElementType.INFO_TAB,
IliasElementType.MEETING,
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.MEETING,
IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
}
_VIDEO_ELEMENTS: Set[IliasElementType] = {
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.MEDIACAST_VIDEO,
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO,
IliasElementType.OPENCAST_VIDEO_PLAYER,
IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
IliasElementType.OPENCAST_VIDEO_PLAYER,
}
@ -155,7 +180,13 @@ instance's greatest bottleneck.
self._auth = auth
self._base_url = section.base_url()
self._client_id = section.client_id()
self._tfa_auth = section.tfa_auth(authenticators)
self._login_type = section.login()
if isinstance(self._login_type, LoginTypeLocal):
self._client_id = self._login_type.client_id
else:
self._shibboleth_login = ShibbolethLogin(self._base_url, self._auth, self._tfa_auth)
self._target = section.target()
self._link_file_redirect_delay = section.link_redirect_delay()
@ -178,94 +209,55 @@ instance's greatest bottleneck.
async def _crawl_course(self, course_id: int) -> None:
# Start crawling at the given course
root_url = url_set_query_param(
urljoin(self._base_url, "/goto.php"),
urljoin(self._base_url + "/", "goto.php"),
"target", f"crs_{course_id}",
)
await self._crawl_url(root_url, expected_id=course_id)
async def _crawl_desktop(self) -> None:
appendix = r"ILIAS\Repository\Provider\RepositoryMainBarProvider|mm_pd_sel_items"
appendix = appendix.encode("ASCII").hex()
await self._crawl_url(url_set_query_param(
urljoin(self._base_url, "/gs_content.php"),
"item=", appendix,
))
await self._crawl_url(
urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show"),
crawl_nested_courses=True
)
async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None:
maybe_cl = await self.crawl(PurePath("."))
if not maybe_cl:
return
cl = maybe_cl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
description: List[BeautifulSoup] = []
@_iorepeat(3, "crawling url")
async def gather_elements() -> None:
elements.clear()
async with cl:
next_stage_url: Optional[str] = url
current_parent = None
# Duplicated code, but the root page is special - we want to avoid fetching it twice!
while next_stage_url:
soup = await self._get_page(next_stage_url, root_page_allowed=True)
if current_parent is None and expected_id is not None:
perma_link = IliasPage.get_soup_permalink(soup)
if not perma_link or "crs_" not in perma_link:
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
next_stage_url = next_element.url
else:
next_stage_url = None
elements.extend(page.get_child_elements())
if info_tab := page.get_info_tab():
elements.append(info_tab)
if description_string := page.get_description():
description.append(description_string)
# Fill up our task list with the found elements
await gather_elements()
if description:
await self._download_description(PurePath("."), description[0])
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(PurePath("."), element):
tasks.append(asyncio.create_task(handle))
# And execute them
await self.gather(tasks)
async def _crawl_url(
self,
url: str,
expected_id: Optional[int] = None,
crawl_nested_courses: bool = False
) -> None:
if awaitable := await self._handle_ilias_page(
url, None, PurePath("."), expected_id, crawl_nested_courses
):
await awaitable
async def _handle_ilias_page(
self,
url: str,
parent: IliasPageElement,
current_element: Optional[IliasPageElement],
path: PurePath,
expected_course_id: Optional[int] = None,
crawl_nested_courses: bool = False
) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(path)
if not maybe_cl:
return None
return self._crawl_ilias_page(url, parent, maybe_cl)
if current_element:
self._ensure_not_seen(current_element, path)
return self._crawl_ilias_page(
url, current_element, maybe_cl, expected_course_id, crawl_nested_courses
)
@anoncritical
async def _crawl_ilias_page(
self,
url: str,
parent: IliasPageElement,
current_element: Optional[IliasPageElement],
cl: CrawlToken,
expected_course_id: Optional[int] = None,
crawl_nested_courses: bool = False,
) -> None:
elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
@ -276,19 +268,30 @@ instance's greatest bottleneck.
elements.clear()
async with cl:
next_stage_url: Optional[str] = url
current_parent = parent
current_parent = current_element
page = None
while next_stage_url:
soup = await self._get_page(next_stage_url)
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
page = IliasPage(soup, next_stage_url, current_parent)
# If we expect to find a root course, enforce it
if current_parent is None and expected_course_id is not None:
perma_link = IliasPage.get_soup_permalink(soup)
if not perma_link or "crs/" not in perma_link:
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
if str(expected_course_id) not in perma_link:
raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}")
page = IliasPage(soup, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
next_stage_url = next_element.url
else:
next_stage_url = None
page = cast(IliasPage, page)
elements.extend(page.get_child_elements())
if description_string := page.get_description():
description.append(description_string)
@ -303,7 +306,7 @@ instance's greatest bottleneck.
tasks: List[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(cl.path, element):
if handle := await self._handle_ilias_element(cl.path, element, crawl_nested_courses):
tasks.append(asyncio.create_task(handle))
# And execute them
@ -319,20 +322,22 @@ instance's greatest bottleneck.
self,
parent_path: PurePath,
element: IliasPageElement,
crawl_nested_courses: bool = False
) -> Optional[Coroutine[Any, Any, None]]:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. "
+ f"First path: {fmt_path(self._visited_urls[element.url])}. "
+ f"Second path: {fmt_path(parent_path)}."
)
self._visited_urls[element.url] = parent_path
# element.name might contain `/` if the crawler created nested elements,
# so we can not sanitize it here. We trust in the output dir to thwart worst-case
# directory escape attacks.
element_path = PurePath(parent_path, element.name)
# This is symptomatic of no access to the element, for example, because
# of time availability restrictions.
if "cmdClass=ilInfoScreenGUI" in element.url and "cmd=showSummary" in element.url:
log.explain(
"Skipping element as url points to info screen, "
"this should only happen with not-yet-released elements"
)
return None
if element.type in _VIDEO_ELEMENTS:
if not self._videos:
log.status(
@ -379,10 +384,70 @@ instance's greatest bottleneck.
"[bright_black](scorm learning modules are not supported)"
)
return None
elif element.type == IliasElementType.LITERATURE_LIST:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](literature lists are not currently supported)"
)
return None
elif element.type == IliasElementType.LEARNING_MODULE_HTML:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](HTML learning modules are not supported)"
)
return None
elif element.type == IliasElementType.BLOG:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](blogs are not currently supported)"
)
return None
elif element.type == IliasElementType.DCL_RECORD_LIST:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](dcl record lists are not currently supported)"
)
return None
elif element.type == IliasElementType.MEDIA_POOL:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](media pools are not currently supported)"
)
return None
elif element.type == IliasElementType.COURSE:
if crawl_nested_courses:
return await self._handle_ilias_page(element.url, element, element_path)
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](not descending into linked course)"
)
return None
elif element.type == IliasElementType.WIKI:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](wikis are not currently supported)"
)
return None
elif element.type == IliasElementType.LEARNING_MODULE:
return await self._handle_learning_module(element, element_path)
elif element.type == IliasElementType.LINK:
return await self._handle_link(element, element_path)
elif element.type == IliasElementType.LINK_COLLECTION:
return await self._handle_link(element, element_path)
elif element.type == IliasElementType.BOOKING:
return await self._handle_booking(element, element_path)
elif element.type == IliasElementType.OPENCAST_VIDEO:
@ -391,6 +456,8 @@ instance's greatest bottleneck.
return await self._handle_opencast_video(element, element_path)
elif element.type == IliasElementType.MEDIACAST_VIDEO:
return await self._handle_file(element, element_path)
elif element.type == IliasElementType.MOB_VIDEO:
return await self._handle_file(element, element_path, is_video=True)
elif element.type in _DIRECTORY_PAGES:
return await self._handle_ilias_page(element.url, element, element_path)
else:
@ -406,45 +473,98 @@ instance's greatest bottleneck.
log.explain_topic(f"Decision: Crawl Link {fmt_path(element_path)}")
log.explain(f"Links type is {self._links}")
link_template_maybe = self._links.template()
link_extension = self._links.extension()
if not link_template_maybe or not link_extension:
export_url = url_set_query_param(element.url, "cmd", "exportHTML")
resolved = await self._resolve_link_target(export_url)
if resolved == "none":
links = [LinkData(element.name, "", element.description or "")]
else:
links = self._parse_link_content(element, cast(BeautifulSoup, resolved))
maybe_extension = self._links.extension()
if not maybe_extension:
log.explain("Answer: No")
return None
else:
log.explain("Answer: Yes")
element_path = element_path.with_name(element_path.name + link_extension)
if len(links) <= 1 or self._links.collection_as_one():
element_path = element_path.with_name(element_path.name + maybe_extension)
maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl:
return None
return self._download_link(self._links, element.name, links, maybe_dl)
return self._download_link(element, link_template_maybe, maybe_dl)
maybe_cl = await self.crawl(element_path)
if not maybe_cl:
return None
# Required for download_all closure
cl = maybe_cl
extension = maybe_extension
async def download_all() -> None:
for link in links:
path = cl.path / (_sanitize_path_name(link.name) + extension)
if dl := await self.download(path, mtime=element.mtime):
await self._download_link(self._links, element.name, [link], dl)
return download_all()
@anoncritical
@_iorepeat(3, "resolving link")
async def _download_link(self, element: IliasPageElement, link_template: str, dl: DownloadToken) -> None:
async with dl as (bar, sink):
export_url = element.url.replace("cmd=calldirectlink", "cmd=exportHTML")
real_url = await self._resolve_link_target(export_url)
self._write_link_content(link_template, real_url, element.name, element.description, sink)
def _write_link_content(
async def _download_link(
self,
link_template: str,
url: str,
name: str,
description: Optional[str],
sink: FileSink,
link_renderer: Links,
collection_name: str,
links: list[LinkData],
dl: DownloadToken
) -> None:
content = link_template
content = content.replace("{{link}}", url)
content = content.replace("{{name}}", name)
content = content.replace("{{description}}", str(description))
content = content.replace("{{redirect_delay}}", str(self._link_file_redirect_delay))
sink.file.write(content.encode("utf-8"))
async with dl as (bar, sink):
rendered = link_renderer.interpolate(self._link_file_redirect_delay, collection_name, links)
sink.file.write(rendered.encode("utf-8"))
sink.done()
async def _resolve_link_target(self, export_url: str) -> Union[BeautifulSoup, Literal['none']]:
async def impl() -> Optional[Union[BeautifulSoup, Literal['none']]]:
async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()) # .select_one("a").get("href").strip() # type: ignore
# We are either unauthenticated or the link is not active
new_url = resp.headers[hdrs.LOCATION].lower()
if "baseclass=illinkresourcehandlergui" in new_url and "cmd=infoscreen" in new_url:
return "none"
return None
auth_id = await self._current_auth_id()
target = await impl()
if target is not None:
return target
await self.authenticate(auth_id)
target = await impl()
if target is not None:
return target
raise CrawlError("resolve_link_target failed even after authenticating")
@staticmethod
def _parse_link_content(element: IliasPageElement, content: BeautifulSoup) -> list[LinkData]:
links = cast(list[Tag], list(content.select("a")))
if len(links) == 1:
url = str(links[0].get("href")).strip()
return [LinkData(name=element.name, description=element.description or "", url=url)]
results = []
for link in links:
url = str(link.get("href")).strip()
name = link.get_text(strip=True)
description = cast(Tag, link.find_next_sibling("dd")).get_text(strip=True)
results.append(LinkData(name=name, description=description, url=url.strip()))
return results
async def _handle_booking(
self,
element: IliasPageElement,
@ -466,7 +586,9 @@ instance's greatest bottleneck.
if not maybe_dl:
return None
return self._download_booking(element, link_template_maybe, maybe_dl)
self._ensure_not_seen(element, element_path)
return self._download_booking(element, maybe_dl)
@anoncritical
@_iorepeat(1, "downloading description")
@ -476,9 +598,10 @@ instance's greatest bottleneck.
if not dl:
return
async with dl as (bar, sink):
async with dl as (_bar, sink):
description = clean(insert_base_markup(description))
sink.file.write(description.prettify().encode("utf-8"))
description_tag = await self.internalize_images(description)
sink.file.write(cast(str, description_tag.prettify()).encode("utf-8"))
sink.done()
@anoncritical
@ -486,26 +609,13 @@ instance's greatest bottleneck.
async def _download_booking(
self,
element: IliasPageElement,
link_template: str,
dl: DownloadToken,
) -> None:
async with dl as (bar, sink):
self._write_link_content(link_template, element.url, element.name, element.description, sink)
async def _resolve_link_target(self, export_url: str) -> str:
async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()).select_one("a").get("href").strip()
await self._authenticate()
async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()).select_one("a").get("href").strip()
raise CrawlError("resolve_link_target failed even after authenticating")
links = [LinkData(name=element.name, description=element.description or "", url=element.url)]
rendered = self._links.interpolate(self._link_file_redirect_delay, element.name, links)
sink.file.write(rendered.encode("utf-8"))
sink.done()
async def _handle_opencast_video(
self,
@ -530,6 +640,8 @@ instance's greatest bottleneck.
if not maybe_dl:
return None
self._ensure_not_seen(element, element_path)
# If we have every file from the cached mapping already, we can ignore this and bail
if self._all_opencast_videos_locally_present(element, maybe_dl.path):
# Mark all existing videos as known to ensure they do not get deleted during cleanup.
@ -590,7 +702,7 @@ instance's greatest bottleneck.
)
async with dl as (bar, sink):
page = IliasPage(await self._get_page(element.url), element.url, element)
page = IliasPage(await self._get_page(element.url), element)
stream_elements = page.get_child_elements()
if len(stream_elements) > 1:
@ -600,7 +712,7 @@ instance's greatest bottleneck.
stream_element = stream_elements[0]
# We do not have a local cache yet
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
await self._stream_from_url(stream_element, sink, bar, is_video=True)
add_to_report([str(self._transformer.transform(dl.path))])
return
@ -615,7 +727,7 @@ instance's greatest bottleneck.
async with maybe_dl as (bar, sink):
log.explain(f"Streaming video from real url {stream_element.url}")
contained_video_paths.append(str(self._transformer.transform(maybe_dl.path)))
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
await self._stream_from_url(stream_element, sink, bar, is_video=True)
add_to_report(contained_video_paths)
@ -623,23 +735,33 @@ instance's greatest bottleneck.
self,
element: IliasPageElement,
element_path: PurePath,
is_video: bool = False,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl:
return None
return self._download_file(element, maybe_dl)
self._ensure_not_seen(element, element_path)
return self._download_file(element, maybe_dl, is_video)
@_iorepeat(3, "downloading file")
@anoncritical
async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None:
async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None:
assert dl # The function is only reached when dl is not None
async with dl as (bar, sink):
await self._stream_from_url(element.url, sink, bar, is_video=False)
await self._stream_from_url(element, sink, bar, is_video)
async def _stream_from_url(
self,
element: IliasPageElement,
sink: FileSink,
bar: ProgressBar,
is_video: bool
) -> None:
url = element.url
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None:
async def try_stream() -> bool:
next_url = url
# Normal files redirect to the magazine if we are not authenticated. As files could be HTML,
# we can not match on the content type here. Instead, we disallow redirects and inspect the
# new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume
@ -663,6 +785,13 @@ instance's greatest bottleneck.
if is_video and "html" in resp.content_type:
return False
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range
if content_range := resp.headers.get(hdrs.CONTENT_RANGE, default=None):
parts = content_range.split("/")
if len(parts) == 2 and parts[1].isdigit():
bar.set_total(int(parts[1]))
# Prefer the content length header
if resp.content_length:
bar.set_total(resp.content_length)
@ -680,7 +809,7 @@ instance's greatest bottleneck.
await self.authenticate(auth_id)
if not await try_stream():
raise CrawlError("File streaming failed after authenticate()")
raise CrawlError(f"File streaming failed after authenticate() {element!r}")
async def _handle_forum(
self,
@ -695,36 +824,23 @@ instance's greatest bottleneck.
@_iorepeat(3, "crawling forum")
@anoncritical
async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None:
elements: List[IliasForumThread] = []
async with cl:
next_stage_url = element.url
while next_stage_url:
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
soup = await self._get_page(next_stage_url)
page = IliasPage(soup, next_stage_url, element)
if next := page.get_next_stage_element():
next_stage_url = next.url
else:
break
download_data = page.get_download_forum_data()
if not download_data:
raise CrawlWarning("Failed to extract forum data")
if download_data.empty:
log.explain("Forum had no threads")
inner = IliasPage(await self._get_page(element.url), element)
export_url = inner.get_forum_export_url()
if not export_url:
log.warn("Could not extract forum export url")
return
html = await self._post_authenticated(download_data.url, download_data.form_data)
elements = parse_ilias_forum_export(soupify(html))
elements.sort(key=lambda elem: elem.title)
export = await self._post(export_url, {
"format": "html",
"cmd[createExportFile]": ""
})
elements = parse_ilias_forum_export(soupify(export))
tasks: List[Awaitable[None]] = []
for elem in elements:
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, elem)))
for thread in elements:
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, element.url)))
# And execute them
await self.gather(tasks)
@ -734,17 +850,22 @@ instance's greatest bottleneck.
async def _download_forum_thread(
self,
parent_path: PurePath,
element: IliasForumThread,
thread: Union[IliasForumThread, IliasPageElement],
forum_url: str
) -> None:
path = parent_path / (_sanitize_path_name(element.title) + ".html")
maybe_dl = await self.download(path, mtime=element.mtime)
if not maybe_dl:
path = parent_path / (_sanitize_path_name(thread.name) + ".html")
maybe_dl = await self.download(path, mtime=thread.mtime)
if not maybe_dl or not isinstance(thread, IliasForumThread):
return
async with maybe_dl as (bar, sink):
content = element.title_tag.prettify()
content += element.content_tag.prettify()
sink.file.write(content.encode("utf-8"))
rendered = forum_thread_template(
thread.name,
forum_url,
thread.name_tag,
await self.internalize_images(thread.content_tag)
)
sink.file.write(rendered.encode("utf-8"))
sink.done()
async def _handle_learning_module(
@ -755,6 +876,8 @@ instance's greatest bottleneck.
maybe_cl = await self.crawl(element_path)
if not maybe_cl:
return None
self._ensure_not_seen(element, element_path)
return self._crawl_learning_module(element, maybe_cl)
@_iorepeat(3, "crawling learning module")
@ -766,7 +889,7 @@ instance's greatest bottleneck.
log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {element.url}")
soup = await self._get_page(element.url)
page = IliasPage(soup, element.url, element)
page = IliasPage(soup, element)
if next := page.get_learning_module_data():
elements.extend(await self._crawl_learning_module_direction(
cl.path, next.previous_url, "left", element
@ -809,7 +932,7 @@ instance's greatest bottleneck.
log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})")
log.explain(f"URL: {next_element_url}")
soup = await self._get_page(next_element_url)
page = IliasPage(soup, next_element_url, parent_element)
page = IliasPage(soup, parent_element)
if next := page.get_learning_module_data():
elements.append(next)
if dir == "left":
@ -840,13 +963,13 @@ instance's greatest bottleneck.
if prev:
prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html"))
if prev_p:
prev = os.path.relpath(prev_p, my_path.parent)
prev = cast(str, os.path.relpath(prev_p, my_path.parent))
else:
prev = None
if next:
next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html"))
if next_p:
next = os.path.relpath(next_p, my_path.parent)
next = cast(str, os.path.relpath(next_p, my_path.parent))
else:
next = None
@ -866,21 +989,30 @@ instance's greatest bottleneck.
continue
if elem.name == "img":
if src := elem.attrs.get("src", None):
url = urljoin(self._base_url, src)
url = urljoin(self._base_url, cast(str, src))
if not url.startswith(self._base_url):
continue
log.explain(f"Internalizing {url!r}")
img = await self._get_authenticated(url)
elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode()
if elem.name == "iframe" and elem.attrs.get("src", "").startswith("//"):
if elem.name == "iframe" and cast(str, elem.attrs.get("src", "")).startswith("//"):
# For unknown reasons the protocol seems to be stripped.
elem.attrs["src"] = "https:" + elem.attrs["src"]
elem.attrs["src"] = "https:" + cast(str, elem.attrs["src"])
return tag
async def _get_page(self, url: str, root_page_allowed: bool = False) -> BeautifulSoup:
def _ensure_not_seen(self, element: IliasPageElement, parent_path: PurePath) -> None:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. "
+ f"First path: {fmt_path(self._visited_urls[element.url])}. "
+ f"Second path: {fmt_path(parent_path)}."
)
self._visited_urls[element.url] = parent_path
async def _get_page(self, url: str, root_page_allowed: bool = False) -> IliasSoup:
auth_id = await self._current_auth_id()
async with self.session.get(url) as request:
soup = soupify(await request.read())
soup = IliasSoup(soupify(await request.read()), str(request.url))
if IliasPage.is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed)
@ -889,13 +1021,13 @@ instance's greatest bottleneck.
# Retry once after authenticating. If this fails, we will die.
async with self.session.get(url) as request:
soup = soupify(await request.read())
soup = IliasSoup(soupify(await request.read()), str(request.url))
if IliasPage.is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed)
raise CrawlError(f"get_page failed even after authenticating on {url!r}")
@staticmethod
def _verify_page(soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup:
def _verify_page(soup: IliasSoup, url: str, root_page_allowed: bool) -> IliasSoup:
if IliasPage.is_root_page(soup) and not root_page_allowed:
raise CrawlError(
"Unexpectedly encountered ILIAS root page. "
@ -907,29 +1039,19 @@ instance's greatest bottleneck.
)
return soup
async def _post_authenticated(
async def _post(
self,
url: str,
data: dict[str, Union[str, List[str]]]
) -> bytes:
auth_id = await self._current_auth_id()
form_data = aiohttp.FormData()
for key, val in data.items():
form_data.add_field(key, val)
async with self.session.post(url, data=form_data(), allow_redirects=False) as request:
async with self.session.post(url, data=form_data()) as request:
if request.status == 200:
return await request.read()
# We weren't authenticated, so try to do that
await self.authenticate(auth_id)
# Retry once after authenticating. If this fails, we will die.
async with self.session.post(url, data=data, allow_redirects=False) as request:
if request.status == 200:
return await request.read()
raise CrawlError("post_authenticated failed even after authenticating")
raise CrawlError(f"post failed with status {request.status}")
async def _get_authenticated(self, url: str) -> bytes:
auth_id = await self._current_auth_id()
@ -947,10 +1069,11 @@ instance's greatest bottleneck.
return await request.read()
raise CrawlError("get_authenticated failed even after authenticating")
# ToDo: Is iorepeat still required?
@_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None:
# fill the session with the correct cookies
if self._login_type == "shibboleth":
await self._shibboleth_login.login(self.session)
else:
params = {
"client_id": self._client_id,
"cmd": "force_login",
@ -958,52 +1081,22 @@ instance's greatest bottleneck.
async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request:
login_page = soupify(await request.read())
login_form = login_page.find("form", attrs={"name": "formlogin"})
login_form = cast(Optional[Tag], login_page.find("form", attrs={"name": "login_form"}))
if login_form is None:
raise CrawlError("Could not find the login form! Specified client id might be invalid.")
login_url = login_form.attrs.get("action")
login_url = cast(Optional[str], login_form.attrs.get("action"))
if login_url is None:
raise CrawlError("Could not find the action URL in the login form!")
username, password = await self._auth.credentials()
login_data = {
"username": username,
"password": password,
"cmd[doStandardAuthentication]": "Login",
}
login_form_data = aiohttp.FormData()
login_form_data.add_field('login_form/input_3/input_4', username)
login_form_data.add_field('login_form/input_3/input_5', password)
# do the actual login
async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request:
soup = soupify(await request.read())
if not self._is_logged_in(soup):
async with self.session.post(urljoin(self._base_url, login_url), data=login_form_data) as request:
soup = IliasSoup(soupify(await request.read()), str(request.url))
if not IliasPage.is_logged_in(soup):
self._auth.invalidate_credentials()
@staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False

File diff suppressed because it is too large Load Diff

View File

@ -1,23 +1,14 @@
from typing import Any, Dict, Optional, Union
from typing import Dict, Literal
import aiohttp
import yarl
from bs4 import BeautifulSoup
from ...auth import Authenticator, TfaAuthenticator
from ...auth import Authenticator
from ...config import Config
from ...logging import log
from ...utils import soupify
from ..crawler import CrawlError, CrawlWarning
from .async_helper import _iorepeat
from .ilias_web_crawler import IliasWebCrawler, IliasWebCrawlerSection
TargetType = Union[str, int]
from .shibboleth_login import ShibbolethLogin
_ILIAS_URL = "https://ilias.studium.kit.edu"
class KitShibbolethBackgroundLoginSuccessful():
class KitShibbolethBackgroundLoginSuccessful:
pass
@ -25,19 +16,8 @@ class KitIliasWebCrawlerSection(IliasWebCrawlerSection):
def base_url(self) -> str:
return _ILIAS_URL
def client_id(self) -> str:
# KIT ILIAS uses the Shibboleth service for authentication. There's no
# use for a client id.
return "unused"
def tfa_auth(self, authenticators: Dict[str, Authenticator]) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
auth = authenticators.get(value)
if auth is None:
self.invalid_value("tfa_auth", value, "No such auth section exists")
return auth
def login(self) -> Literal["shibboleth"]:
return "shibboleth"
class KitIliasWebCrawler(IliasWebCrawler):
@ -46,184 +26,12 @@ class KitIliasWebCrawler(IliasWebCrawler):
name: str,
section: KitIliasWebCrawlerSection,
config: Config,
authenticators: Dict[str, Authenticator]
authenticators: Dict[str, Authenticator],
):
super().__init__(name, section, config, authenticators)
self._shibboleth_login = KitShibbolethLogin(
self._shibboleth_login = ShibbolethLogin(
_ILIAS_URL,
self._auth,
section.tfa_auth(authenticators),
)
# We repeat this as the login method in shibboleth doesn't handle I/O errors.
# Shibboleth is quite reliable as well, the repeat is likely not critical here.
@_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session)
class KitShibbolethLogin:
"""
Login via KIT's shibboleth system.
"""
def __init__(self, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]) -> None:
self._auth = authenticator
self._tfa_auth = tfa_authenticator
async def login(self, sess: aiohttp.ClientSession) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
url = f"{_ILIAS_URL}/shib_login.php"
data = {
"sendLogin": "1",
"idp_selection": "https://idp.scc.kit.edu/idp/shibboleth",
"il_target": "",
"home_organization_selection": "Weiter",
}
soup: Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful] = await _shib_post(sess, url, data)
if isinstance(soup, KitShibbolethBackgroundLoginSuccessful):
return
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"class": "full content", "method": "post"})
action = form["action"]
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = "https://idp.scc.kit.edu" + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
"csrf_token": csrf_token
}
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup)
if not self._login_successful(soup):
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = f"{_ILIAS_URL}/Shibboleth.sso/SAML2/POST"
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
await sess.post(url, data=data)
async def _authenticate_tfa(
self,
session: aiohttp.ClientSession,
soup: BeautifulSoup
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
tfa_token = await self._tfa_auth.password()
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = "https://idp.scc.kit.edu" + action
data = {
"_eventId_proceed": "",
"j_tokenNumber": tfa_token,
"csrf_token": csrf_token
}
return await _post(session, url, data)
@staticmethod
def _login_successful(soup: BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="j_tokenNumber") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())
async def _shib_post(
session: aiohttp.ClientSession,
url: str,
data: Any
) -> Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful]:
"""
aiohttp unescapes '/' and ':' in URL query parameters which is not RFC compliant and rejected
by Shibboleth. Thanks a lot. So now we unroll the requests manually, parse location headers and
build encoded URL objects ourselves... Who thought mangling location header was a good idea??
"""
log.explain_topic("Shib login POST")
async with session.post(url, data=data, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Got location {location!r}")
if not location:
raise CrawlWarning(f"Login failed (1), no location header present at {url}")
correct_url = yarl.URL(location, encoded=True)
log.explain(f"Corrected location to {correct_url!r}")
if str(correct_url).startswith(_ILIAS_URL):
log.explain("ILIAS recognized our shib token and logged us in in the background, returning")
return KitShibbolethBackgroundLoginSuccessful()
async with session.get(correct_url, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Redirected to {location!r} with status {response.status}")
# If shib still has a valid session, it will directly respond to the request
if location is None:
log.explain("Shib recognized us, returning its response directly")
return soupify(await response.read())
as_yarl = yarl.URL(response.url)
# Probably not needed anymore, but might catch a few weird situations with a nicer message
if not location or not as_yarl.host:
raise CrawlWarning(f"Login failed (2), no location header present at {correct_url}")
correct_url = yarl.URL.build(
scheme=as_yarl.scheme,
host=as_yarl.host,
path=location,
encoded=True
)
log.explain(f"Corrected location to {correct_url!r}")
async with session.get(correct_url, allow_redirects=False) as response:
return soupify(await response.read())

View File

@ -0,0 +1,129 @@
from typing import Any, Optional, cast
import aiohttp
import yarl
from bs4 import BeautifulSoup, Tag
from ...auth import Authenticator, TfaAuthenticator
from ...logging import log
from ...utils import soupify
from ..crawler import CrawlError
class ShibbolethLogin:
"""
Login via shibboleth system.
"""
def __init__(
self, ilias_url: str, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]
) -> None:
self._ilias_url = ilias_url
self._auth = authenticator
self._tfa_auth = tfa_authenticator
async def login(self, sess: aiohttp.ClientSession) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
url = f"{self._ilias_url}/shib_login.php"
async with sess.get(url) as response:
shib_url = response.url
if str(shib_url).startswith(self._ilias_url):
log.explain(
"ILIAS recognized our shib token and logged us in in the background, returning"
)
return
soup: BeautifulSoup = soupify(await response.read())
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = cast(Tag, soup.find("form", {"method": "post"}))
action = cast(str, form["action"])
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
"fudis_web_authn_assertion_input": "",
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] # type: ignore
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup, shib_url)
if not self._login_successful(soup):
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = cast(Tag, soup.find("input", {"name": "RelayState"}))
saml_response = cast(Tag, soup.find("input", {"name": "SAMLResponse"}))
url = form = soup.find("form", {"method": "post"})["action"] # type: ignore
data = { # using the info obtained in the while loop above
"RelayState": cast(str, relay_state["value"]),
"SAMLResponse": cast(str, saml_response["value"]),
}
await sess.post(cast(str, url), data=data)
async def _authenticate_tfa(
self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
tfa_token = await self._tfa_auth.password()
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = cast(Tag, soup.find("form", {"method": "post"}))
action = cast(str, form["action"])
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"fudis_otp_input": tfa_token,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] # type: ignore
return await _post(session, url, data)
@staticmethod
def _login_successful(soup: BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="fudiscr-form") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())

View File

@ -1,8 +1,9 @@
import os
import re
from dataclasses import dataclass
from datetime import datetime
from pathlib import PurePath
from typing import Awaitable, List, Optional, Pattern, Set, Tuple, Union
from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union, cast
from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag
@ -31,24 +32,24 @@ class KitIpdCrawlerSection(HttpCrawlerSection):
return re.compile(regex)
@dataclass(unsafe_hash=True)
@dataclass
class KitIpdFile:
name: str
url: str
def explain(self) -> None:
log.explain(f"File {self.name!r} (href={self.url!r})")
@dataclass
class KitIpdFolder:
name: str
files: List[KitIpdFile]
entries: List[Union[KitIpdFile, "KitIpdFolder"]]
def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}")
for file in self.files:
log.explain(f"File {file.name!r} (href={file.url!r})")
def __hash__(self) -> int:
return self.name.__hash__()
for entry in self.entries:
entry.explain()
class KitIpdCrawler(HttpCrawler):
@ -72,81 +73,96 @@ class KitIpdCrawler(HttpCrawler):
async with maybe_cl:
for item in await self._fetch_items():
item.explain()
if isinstance(item, KitIpdFolder):
tasks.append(self._crawl_folder(item))
tasks.append(self._crawl_folder(PurePath("."), item))
else:
# Orphan files are placed in the root folder
tasks.append(self._download_file(PurePath("."), item))
log.explain_topic(f"Orphan file {item.name!r} (href={item.url!r})")
log.explain("Attributing it to root folder")
# do this here to at least be sequential and not parallel (rate limiting is hard, as the
# crawl abstraction does not hold for these requests)
etag, mtime = await self._request_resource_version(item.url)
tasks.append(self._download_file(PurePath("."), item, etag, mtime))
await self.gather(tasks)
async def _crawl_folder(self, folder: KitIpdFolder) -> None:
path = PurePath(folder.name)
async def _crawl_folder(self, parent: PurePath, folder: KitIpdFolder) -> None:
path = parent / folder.name
if not await self.crawl(path):
return
tasks = [self._download_file(path, file) for file in folder.files]
tasks = []
for entry in folder.entries:
if isinstance(entry, KitIpdFolder):
tasks.append(self._crawl_folder(path, entry))
else:
# do this here to at least be sequential and not parallel (rate limiting is hard, as the crawl
# abstraction does not hold for these requests)
etag, mtime = await self._request_resource_version(entry.url)
tasks.append(self._download_file(path, entry, etag, mtime))
await self.gather(tasks)
async def _download_file(self, parent: PurePath, file: KitIpdFile) -> None:
async def _download_file(
self,
parent: PurePath,
file: KitIpdFile,
etag: Optional[str],
mtime: Optional[datetime]
) -> None:
element_path = parent / file.name
maybe_dl = await self.download(element_path)
prev_etag = self._get_previous_etag_from_report(element_path)
etag_differs = None if prev_etag is None else prev_etag != etag
maybe_dl = await self.download(element_path, etag_differs=etag_differs, mtime=mtime)
if not maybe_dl:
# keep storing the known file's etag
if prev_etag:
self._add_etag_to_report(element_path, prev_etag)
return
async with maybe_dl as (bar, sink):
await self._stream_from_url(file.url, sink, bar)
await self._stream_from_url(file.url, element_path, sink, bar)
async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]:
async def _fetch_items(self) -> Iterable[Union[KitIpdFile, KitIpdFolder]]:
page, url = await self.get_page()
elements: List[Tag] = self._find_file_links(page)
items: Set[Union[KitIpdFile, KitIpdFolder]] = set()
# do not add unnecessary nesting for a single <h1> heading
drop_h1: bool = len(page.find_all(name="h1")) <= 1
folder_tree: KitIpdFolder = KitIpdFolder(".", [])
for element in elements:
folder_label = self._find_folder_label(element)
if folder_label:
folder = self._extract_folder(folder_label, url)
if folder not in items:
items.add(folder)
folder.explain()
else:
parent = HttpCrawler.get_folder_structure_from_heading_hierarchy(element, drop_h1)
file = self._extract_file(element, url)
items.add(file)
log.explain_topic(f"Orphan file {file.name!r} (href={file.url!r})")
log.explain("Attributing it to root folder")
return items
current_folder: KitIpdFolder = folder_tree
for folder_name in parent.parts:
# helps the type checker to verify that current_folder is indeed a folder
def subfolders() -> Generator[KitIpdFolder, Any, None]:
return (entry for entry in current_folder.entries if isinstance(entry, KitIpdFolder))
def _extract_folder(self, folder_tag: Tag, url: str) -> KitIpdFolder:
files: List[KitIpdFile] = []
name = folder_tag.getText().strip()
if not any(entry.name == folder_name for entry in subfolders()):
current_folder.entries.append(KitIpdFolder(folder_name, []))
current_folder = next(entry for entry in subfolders() if entry.name == folder_name)
container: Tag = folder_tag.findNextSibling(name="table")
for link in self._find_file_links(container):
files.append(self._extract_file(link, url))
current_folder.entries.append(file)
return KitIpdFolder(name, files)
@staticmethod
def _find_folder_label(file_link: Tag) -> Optional[Tag]:
enclosing_table: Tag = file_link.findParent(name="table")
if enclosing_table is None:
return None
return enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$"))
return folder_tree.entries
def _extract_file(self, link: Tag, url: str) -> KitIpdFile:
url = self._abs_url_from_link(url, link)
name = os.path.basename(url)
return KitIpdFile(name, url)
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> List[Tag]:
return tag.findAll(name="a", attrs={"href": self._file_regex})
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> list[Tag]:
return cast(list[Tag], tag.find_all(name="a", attrs={"href": self._file_regex}))
def _abs_url_from_link(self, url: str, link_tag: Tag) -> str:
return urljoin(url, link_tag.get("href"))
return urljoin(url, cast(str, link_tag.get("href")))
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar) -> None:
async def _stream_from_url(self, url: str, path: PurePath, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp:
if resp.status == 403:
raise CrawlError("Received a 403. Are you within the KIT network/VPN?")
@ -159,6 +175,8 @@ class KitIpdCrawler(HttpCrawler):
sink.done()
self._add_etag_to_report(path, resp.headers.get("ETag"))
async def get_page(self) -> Tuple[BeautifulSoup, str]:
async with self.session.get(self._url) as request:
# The web page for Algorithmen für Routenplanung contains some

View File

@ -1,9 +1,8 @@
import asyncio
import sys
import traceback
from contextlib import asynccontextmanager, contextmanager
# TODO In Python 3.9 and above, ContextManager is deprecated
from typing import AsyncIterator, ContextManager, Iterator, List, Optional
from contextlib import AbstractContextManager, asynccontextmanager, contextmanager
from typing import AsyncIterator, Iterator, List, Optional
from rich.console import Console, Group
from rich.live import Live
@ -261,7 +260,7 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
action: str,
text: str,
total: Optional[float] = None,
) -> ContextManager[ProgressBar]:
) -> AbstractContextManager[ProgressBar]:
"""
Allows markup in the "style" argument which will be applied to the
"action" string.
@ -277,7 +276,7 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
action: str,
text: str,
total: Optional[float] = None,
) -> ContextManager[ProgressBar]:
) -> AbstractContextManager[ProgressBar]:
"""
Allows markup in the "style" argument which will be applied to the
"action" string.

View File

@ -57,6 +57,7 @@ class OnConflict(Enum):
@dataclass
class Heuristics:
etag_differs: Optional[bool]
mtime: Optional[datetime]
@ -233,8 +234,16 @@ class OutputDirectory:
remote_newer = None
# ETag should be a more reliable indicator than mtime, so we check it first
if heuristics.etag_differs is not None:
remote_newer = heuristics.etag_differs
if remote_newer:
log.explain("Remote file's entity tag differs")
else:
log.explain("Remote file's entity tag is the same")
# Python on Windows crashes when faced with timestamps around the unix epoch
if heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970):
if remote_newer is None and heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970):
mtime = heuristics.mtime
remote_newer = mtime.timestamp() > stat.st_mtime
if remote_newer:
@ -362,10 +371,28 @@ class OutputDirectory:
raise OutputDirError("Failed to create temporary file")
def should_try_download(
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
) -> bool:
heuristics = Heuristics(etag_differs, mtime)
redownload = self._redownload if redownload is None else redownload
on_conflict = self._on_conflict if on_conflict is None else on_conflict
local_path = self.resolve(path)
return self._should_download(local_path, heuristics, redownload, on_conflict)
async def download(
self,
remote_path: PurePath,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
@ -375,7 +402,7 @@ class OutputDirectory:
MarkConflictError.
"""
heuristics = Heuristics(mtime)
heuristics = Heuristics(etag_differs, mtime)
redownload = self._redownload if redownload is None else redownload
on_conflict = self._on_conflict if on_conflict is None else on_conflict
local_path = self.resolve(path)

View File

@ -1,4 +1,4 @@
from pathlib import Path
from pathlib import Path, PurePath
from typing import Dict, List, Optional
from rich.markup import escape
@ -168,19 +168,24 @@ class Pferd:
log.report("")
log.report(f"[bold bright_cyan]Report[/] for {escape(name)}")
def fmt_path_link(relative_path: PurePath) -> str:
# We need to URL-encode the path because it might contain spaces or special characters
link = crawler.output_dir.resolve(relative_path).absolute().as_uri()
return f"[link={link}]{fmt_path(relative_path)}[/link]"
something_changed = False
for path in sorted(crawler.report.added_files):
something_changed = True
log.report(f" [bold bright_green]Added[/] {fmt_path(path)}")
log.report(f" [bold bright_green]Added[/] {fmt_path_link(path)}")
for path in sorted(crawler.report.changed_files):
something_changed = True
log.report(f" [bold bright_yellow]Changed[/] {fmt_path(path)}")
log.report(f" [bold bright_yellow]Changed[/] {fmt_path_link(path)}")
for path in sorted(crawler.report.deleted_files):
something_changed = True
log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}")
for path in sorted(crawler.report.not_deleted_files):
something_changed = True
log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path_link(path)}")
for warning in crawler.report.encountered_warnings:
something_changed = True

View File

@ -34,15 +34,6 @@ class MarkConflictError(Exception):
self.collides_with = collides_with
# TODO Use PurePath.is_relative_to when updating to 3.9
def is_relative_to(a: PurePath, b: PurePath) -> bool:
try:
a.relative_to(b)
return True
except ValueError:
return False
class Report:
"""
A report of a synchronization. Includes all files found by the crawler, as
@ -173,7 +164,7 @@ class Report:
if path == other:
raise MarkDuplicateError(path)
if is_relative_to(path, other) or is_relative_to(other, path):
if path.is_relative_to(other) or other.is_relative_to(path):
raise MarkConflictError(path, other)
self.known_files.add(path)

View File

@ -1,2 +1,2 @@
NAME = "PFERD"
VERSION = "3.6.0"
VERSION = "3.8.3"

View File

@ -17,7 +17,7 @@ Binaries for Linux, Windows and Mac can be downloaded directly from the
### With pip
Ensure you have at least Python 3.9 installed. Run the following command to
Ensure you have at least Python 3.11 installed. Run the following command to
install PFERD or upgrade it to the latest version:
```

8
flake.lock generated
View File

@ -2,16 +2,16 @@
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1708979614,
"narHash": "sha256-FWLWmYojIg6TeqxSnHkKpHu5SGnFP5um1uUjH+wRV6g=",
"lastModified": 1744440957,
"narHash": "sha256-FHlSkNqFmPxPJvy+6fNLaNeWnF1lZSgqVCl/eWaJRc4=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "b7ee09cf5614b02d289cd86fcfa6f24d4e078c2a",
"rev": "26d499fc9f1d567283d5d56fcf367edd815dba1d",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.11",
"ref": "nixos-24.11",
"repo": "nixpkgs",
"type": "github"
}

View File

@ -2,7 +2,7 @@
description = "Tool for downloading course-related files from ILIAS";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11";
nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.11";
};
outputs = { self, nixpkgs }:

View File

@ -12,7 +12,7 @@ dependencies = [
"certifi>=2021.10.8"
]
dynamic = ["version"]
requires-python = ">=3.9"
requires-python = ">=3.11"
[project.scripts]
pferd = "PFERD.__main__:main"