Compare commits

...

30 Commits

Author SHA1 Message Date
7291382430 Bump version to 3.8.0 2025-04-15 11:32:22 +02:00
1a430ad5d1 Update minimum Python version to 3.11 2025-04-15 11:31:39 +02:00
f6bdeb6b9d Support ILIAS 9 2025-04-15 11:19:53 +02:00
63f25277b0 Fix crawling of empty forum threads 2025-03-09 23:44:25 +01:00
c8eff04ae0 Make thread titles link to original ILIAS thread 2025-02-19 16:23:20 +01:00
edc482cdf4 Internalize images in forum threads 2025-02-19 16:23:20 +01:00
72cd0f77e2 Prettify forum thread exports
Co-authored-by: Tim <me@scriptim.dev>
2025-02-19 16:23:20 +01:00
be175f9347 Download only new/updated forum threads 2025-02-19 16:16:37 +01:00
ba2833dba5 Crawl all threads in a forum
Before this patch the row count was unconditionally changed to 800. This
patch tries to detect how many rows the forum has and then fetches this
amount, if it is larger than 800.
2025-02-19 12:19:33 +01:00
2f0e792670 Increase default http timeout to 30
Otherwise larger forums will fail to download in time
2025-02-19 12:19:13 +01:00
5f88539f7e Fix page size increase for forum threads 2025-02-19 12:19:11 +01:00
bd9d7efe64 "Fix" mypy errors
Thank you mypy, very cool. These types make things *so much better*.
They don't just complicate everything and don't really help because they
can not detect that an element queried by a tag is no navigable
string...
2025-02-19 12:15:41 +01:00
16a2dd5b15 fix: totp 2025-02-19 12:15:41 +01:00
678283d341 Use Python facilities to convert paths to file:// urls 2024-11-15 00:09:11 +01:00
287173b0b1 Bump version to 3.7.0 2024-11-13 20:38:27 +01:00
712217e959 Handle groups in cards 2024-11-11 12:53:08 +01:00
6dda4c55a8 Add doctype header to forum threads
This should fix mimetype detection on most systems and is more relevant
now that the report is clickable
2024-11-05 18:36:21 +01:00
596b6a7688 Add support for non-KIT shibboleth login (#98)
Co-authored-by: Mr-Pine <git@mr-pine.de>
Co-authored-by: I-Al-Istannen <I-Al-Istannen@users.noreply.github.com>
2024-11-05 18:30:34 +01:00
Tim
5983200247 Treat headings as folders in kit-ipd crawler (#99) 2024-11-04 23:53:48 +01:00
Tim
26e802d88b Add clickable links to file names in the printed report (#100)
Co-authored-by: I-Al-Istannen <i-al-istannen@users.noreply.github.com>
2024-11-04 00:32:32 +01:00
f5c4e82816 Delay ilias loop detection after transform
This allows users to filter out duplicated elements and suppress the
warning.
2024-11-02 22:46:51 +01:00
f5273f7ca0 Collapse ilias url crawling into normal page crawling 2024-11-02 22:46:51 +01:00
fa71a9f44f Add support for mob videos in page descriptions 2024-10-28 20:35:30 +01:00
81d6ff53c4 Respect row flex in descriptions 2024-10-28 19:41:03 +01:00
d7a2b6e019 Delete videos from course descriptions 2024-10-28 19:41:03 +01:00
71c65e89d1 Internalize images in course descriptions 2024-10-28 19:41:03 +01:00
c1046498e7 Fix download of links without a target URL
They are now downloaded as links to the empty url.
2024-10-28 19:41:03 +01:00
8fbd1978af Fix crawling of nested courses 2024-10-28 18:52:27 +01:00
Tim
739dd95850 Use Last-Modified and ETag headers to determine KIT-IPD file versions (#95)
Co-authored-by: I-Al-Istannen <i-al-istannen@users.noreply.github.com>
2024-10-27 19:03:47 +01:00
c54c3bcfa1 Fix crawling of favorites 2024-10-27 10:50:59 +01:00
24 changed files with 1566 additions and 906 deletions

View File

@ -14,7 +14,7 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
os: [ubuntu-latest, windows-latest, macos-13, macos-latest] os: [ubuntu-latest, windows-latest, macos-13, macos-latest]
python: ["3.9"] python: ["3.11"]
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4

View File

@ -22,6 +22,41 @@ ambiguous situations.
## Unreleased ## Unreleased
## 3.8.0 - 2025-04-15
### Added
- Support for ILIAS 9
### Changed
- Added prettier CSS to forum threads
- Increase minimum supported Python version to 3.11
## Fixed
- File links in report on Windows
- TOTP authentication in KIT Shibboleth
- Forum crawling only considering the first 20 entries
## 3.7.0 - 2024-11-13
### Added
- Support for MOB videos in page descriptions
- Clickable links in the report to directly open new/modified/not-deleted files
- Support for non KIT shibboleth login
### Changed
- Remove videos from description pages
- Perform ILIAS cycle detection after processing the transform to allow
ignoring duplicated elements
- Parse headings (h1-h3) as folders in kit-ipd crawler
### Fixed
- Personal desktop/dashboard/favorites crawling
- Crawling of nested courses
- Downloading of links with no target URL
- Handle row flex on description pages
- Add `<!DOCTYPE html>` heading to forum threads to fix mime type detection
- Handle groups in cards
## 3.6.0 - 2024-10-23 ## 3.6.0 - 2024-10-23
### Added ### Added

View File

@ -163,12 +163,13 @@ out of the box for the corresponding universities:
[ilias-dl]: https://github.com/V3lop5/ilias-downloader/blob/main/configs "ilias-downloader configs" [ilias-dl]: https://github.com/V3lop5/ilias-downloader/blob/main/configs "ilias-downloader configs"
| University | `base_url` | `client_id` | | University | `base_url` | `login_type` | `client_id` |
|---------------|--------------------------------------|---------------| |---------------|-----------------------------------------|--------------|---------------|
| FH Aachen | https://www.ili.fh-aachen.de | elearning | | FH Aachen | https://www.ili.fh-aachen.de | local | elearning |
| Uni Köln | https://www.ilias.uni-koeln.de/ilias | uk | | Uni Köln | https://www.ilias.uni-koeln.de/ilias | local | uk |
| Uni Konstanz | https://ilias.uni-konstanz.de | ILIASKONSTANZ | | Uni Konstanz | https://ilias.uni-konstanz.de | local | ILIASKONSTANZ |
| Uni Stuttgart | https://ilias3.uni-stuttgart.de | Uni_Stuttgart | | Uni Stuttgart | https://ilias3.uni-stuttgart.de | local | Uni_Stuttgart |
| Uni Tübingen | https://ovidius.uni-tuebingen.de/ilias3 | shibboleth | |
If your university isn't listed, try navigating to your instance's login page. If your university isn't listed, try navigating to your instance's login page.
Assuming no custom login service is used, the URL will look something like this: Assuming no custom login service is used, the URL will look something like this:
@ -180,7 +181,11 @@ Assuming no custom login service is used, the URL will look something like this:
If the values work, feel free to submit a PR and add them to the table above. If the values work, feel free to submit a PR and add them to the table above.
- `base_url`: The URL where the ILIAS instance is located. (Required) - `base_url`: The URL where the ILIAS instance is located. (Required)
- `client_id`: An ID used for authentication. (Required) - `login_type`: How you authenticate. (Required)
- `local`: Use `client_id` for authentication.
- `shibboleth`: Use shibboleth for authentication.
- `client_id`: An ID used for authentication if `login_type` is `local`. Is
ignored if `login_type` is `shibboleth`.
- `target`: The ILIAS element to crawl. (Required) - `target`: The ILIAS element to crawl. (Required)
- `desktop`: Crawl your personal desktop / dashboard - `desktop`: Crawl your personal desktop / dashboard
- `<course id>`: Crawl the course with the given id - `<course id>`: Crawl the course with the given id
@ -191,6 +196,8 @@ If the values work, feel free to submit a PR and add them to the table above.
and duplication warnings if you are a member of an ILIAS group. The and duplication warnings if you are a member of an ILIAS group. The
`desktop` target is generally preferable. `desktop` target is generally preferable.
- `auth`: Name of auth section to use for login. (Required) - `auth`: Name of auth section to use for login. (Required)
- `tfa_auth`: Name of auth section to use for two-factor authentication. Only
uses the auth section's password. (Default: Anonymous `tfa` authenticator)
- `links`: How to represent external links. (Default: `fancy`) - `links`: How to represent external links. (Default: `fancy`)
- `ignore`: Don't download links. - `ignore`: Don't download links.
- `plaintext`: A text file containing only the URL. - `plaintext`: A text file containing only the URL.

View File

@ -1,6 +1,6 @@
Copyright 2019-2024 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, Copyright 2019-2024 Garmelon, I-Al-Istannen, danstooamerican, pavelzw,
TheChristophe, Scriptim, thelukasprobst, Toorero, TheChristophe, Scriptim, thelukasprobst, Toorero,
Mr-Pine, p-fruck Mr-Pine, p-fruck, PinieP
Permission is hereby granted, free of charge, to any person obtaining a copy of Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in this software and associated documentation files (the "Software"), to deal in

View File

@ -1,4 +1,4 @@
from typing import Optional, Tuple from typing import Optional, Tuple, cast
import keyring import keyring
@ -13,7 +13,7 @@ class KeyringAuthSection(AuthSection):
return self.s.get("username") return self.s.get("username")
def keyring_name(self) -> str: def keyring_name(self) -> str:
return self.s.get("keyring_name", fallback=NAME) return cast(str, self.s.get("keyring_name", fallback=NAME))
class KeyringAuthenticator(Authenticator): class KeyringAuthenticator(Authenticator):

View File

@ -149,9 +149,7 @@ class CrawlerSection(Section):
return self.s.getboolean("skip", fallback=False) return self.s.getboolean("skip", fallback=False)
def output_dir(self, name: str) -> Path: def output_dir(self, name: str) -> Path:
# TODO Use removeprefix() after switching to 3.9 name = name.removeprefix("crawl:")
if name.startswith("crawl:"):
name = name[len("crawl:"):]
return Path(self.s.get("output_dir", name)).expanduser() return Path(self.s.get("output_dir", name)).expanduser()
def redownload(self) -> Redownload: def redownload(self) -> Redownload:
@ -258,6 +256,10 @@ class Crawler(ABC):
def prev_report(self) -> Optional[Report]: def prev_report(self) -> Optional[Report]:
return self._output_dir.prev_report return self._output_dir.prev_report
@property
def output_dir(self) -> OutputDirectory:
return self._output_dir
@staticmethod @staticmethod
async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]: async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]:
""" """
@ -290,9 +292,40 @@ class Crawler(ABC):
log.explain("Answer: Yes") log.explain("Answer: Yes")
return CrawlToken(self._limiter, path) return CrawlToken(self._limiter, path)
def should_try_download(
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
) -> bool:
log.explain_topic(f"Decision: Should Download {fmt_path(path)}")
if self._transformer.transform(path) is None:
log.explain("Answer: No (ignored)")
return False
should_download = self._output_dir.should_try_download(
path,
etag_differs=etag_differs,
mtime=mtime,
redownload=redownload,
on_conflict=on_conflict
)
if should_download:
log.explain("Answer: Yes")
return True
else:
log.explain("Answer: No")
return False
async def download( async def download(
self, self,
path: PurePath, path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None, mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None, redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None, on_conflict: Optional[OnConflict] = None,
@ -307,7 +340,14 @@ class Crawler(ABC):
log.status("[bold bright_black]", "Ignored", fmt_path(path)) log.status("[bold bright_black]", "Ignored", fmt_path(path))
return None return None
fs_token = await self._output_dir.download(path, transformed_path, mtime, redownload, on_conflict) fs_token = await self._output_dir.download(
path,
transformed_path,
etag_differs=etag_differs,
mtime=mtime,
redownload=redownload,
on_conflict=on_conflict
)
if fs_token is None: if fs_token is None:
log.explain("Answer: No") log.explain("Answer: No")
return None return None

View File

@ -1,12 +1,14 @@
import asyncio import asyncio
import http.cookies import http.cookies
import ssl import ssl
from datetime import datetime
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional, Tuple, cast
import aiohttp import aiohttp
import certifi import certifi
from aiohttp.client import ClientTimeout from aiohttp.client import ClientTimeout
from bs4 import Tag
from ..auth import Authenticator from ..auth import Authenticator
from ..config import Config from ..config import Config
@ -15,10 +17,12 @@ from ..utils import fmt_real_path
from ..version import NAME, VERSION from ..version import NAME, VERSION
from .crawler import Crawler, CrawlerSection from .crawler import Crawler, CrawlerSection
ETAGS_CUSTOM_REPORT_VALUE_KEY = "etags"
class HttpCrawlerSection(CrawlerSection): class HttpCrawlerSection(CrawlerSection):
def http_timeout(self) -> float: def http_timeout(self) -> float:
return self.s.getfloat("http_timeout", fallback=20) return self.s.getfloat("http_timeout", fallback=30)
class HttpCrawler(Crawler): class HttpCrawler(Crawler):
@ -169,6 +173,79 @@ class HttpCrawler(Crawler):
log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}") log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}")
log.warn(str(e)) log.warn(str(e))
@staticmethod
def get_folder_structure_from_heading_hierarchy(file_link: Tag, drop_h1: bool = False) -> PurePath:
"""
Retrieves the hierarchy of headings associated with the give file link and constructs a folder
structure from them.
<h1> level headings usually only appear once and serve as the page title, so they would introduce
redundant nesting. To avoid this, <h1> headings are ignored via the drop_h1 parameter.
"""
def find_associated_headings(tag: Tag, level: int) -> PurePath:
if level == 0 or (level == 1 and drop_h1):
return PurePath()
level_heading = cast(Optional[Tag], tag.find_previous(name=f"h{level}"))
if level_heading is None:
return find_associated_headings(tag, level - 1)
folder_name = level_heading.get_text().strip()
return find_associated_headings(level_heading, level - 1) / folder_name
# start at level <h3> because paragraph-level headings are usually too granular for folder names
return find_associated_headings(file_link, 3)
def _get_previous_etag_from_report(self, path: PurePath) -> Optional[str]:
"""
If available, retrieves the entity tag for a given path which was stored in the previous report.
"""
if not self._output_dir.prev_report:
return None
etags = self._output_dir.prev_report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {}
return etags.get(str(path))
def _add_etag_to_report(self, path: PurePath, etag: Optional[str]) -> None:
"""
Adds an entity tag for a given path to the report's custom values.
"""
if not etag:
return
etags = self._output_dir.report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {}
etags[str(path)] = etag
self._output_dir.report.add_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY, etags)
async def _request_resource_version(self, resource_url: str) -> Tuple[Optional[str], Optional[datetime]]:
"""
Requests the ETag and Last-Modified headers of a resource via a HEAD request.
If no entity tag / modification date can be obtained, the according value will be None.
"""
try:
async with self.session.head(resource_url) as resp:
if resp.status != 200:
return None, None
etag_header = resp.headers.get("ETag")
last_modified_header = resp.headers.get("Last-Modified")
last_modified = None
if last_modified_header:
try:
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified#directives
datetime_format = "%a, %d %b %Y %H:%M:%S GMT"
last_modified = datetime.strptime(last_modified_header, datetime_format)
except ValueError:
# last_modified remains None
pass
return etag_header, last_modified
except aiohttp.ClientError:
return None, None
async def run(self) -> None: async def run(self) -> None:
self._request_count = 0 self._request_count = 0
self._cookie_jar = aiohttp.CookieJar() self._cookie_jar = aiohttp.CookieJar()
@ -186,7 +263,12 @@ class HttpCrawler(Crawler):
connect=self._http_timeout, connect=self._http_timeout,
sock_connect=self._http_timeout, sock_connect=self._http_timeout,
sock_read=self._http_timeout, sock_read=self._http_timeout,
) ),
# See https://github.com/aio-libs/aiohttp/issues/6626
# Without this aiohttp will mangle the redirect header from Shibboleth, invalidating the
# passed signature. Shibboleth will not accept the broken signature and authentication will
# fail.
requote_redirect_url=False
) as session: ) as session:
self.session = session self.session = session
try: try:

View File

@ -25,9 +25,10 @@ def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Calla
except asyncio.exceptions.TimeoutError as e: # explicit http timeouts in HttpCrawler except asyncio.exceptions.TimeoutError as e: # explicit http timeouts in HttpCrawler
last_exception = e last_exception = e
log.explain_topic(f"Retrying operation {name}. Retries left: {attempts - 1 - round}") log.explain_topic(f"Retrying operation {name}. Retries left: {attempts - 1 - round}")
log.explain(f"Last exception: {last_exception!r}")
if last_exception: if last_exception:
message = f"Error in I/O Operation: {last_exception}" message = f"Error in I/O Operation: {last_exception!r}"
if failure_is_error: if failure_is_error:
raise CrawlError(message) from last_exception raise CrawlError(message) from last_exception
else: else:

View File

@ -1,5 +1,5 @@
from enum import Enum from enum import Enum
from typing import Optional from typing import Optional, cast
import bs4 import bs4
@ -126,6 +126,88 @@ _learning_module_template = """
</html> </html>
""" """
_forum_thread_template = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>ILIAS - Forum: {{name}}</title>
<style>
* {
box-sizing: border-box;
}
body {
font-family: 'Open Sans', Verdana, Arial, Helvetica, sans-serif;
padding: 8px;
}
ul, ol, p {
margin: 1.2em 0;
}
p {
margin-top: 8px;
margin-bottom: 8px;
}
a {
color: #00876c;
text-decoration: none;
cursor: pointer;
}
a:hover {
text-decoration: underline;
}
body > p:first-child > span:first-child {
font-size: 1.6em;
}
body > p:first-child > span:first-child ~ span.default {
display: inline-block;
font-size: 1.2em;
padding-bottom: 8px;
}
.ilFrmPostContent {
margin-top: 8px;
max-width: 64em;
}
.ilFrmPostContent > *:first-child {
margin-top: 0px;
}
.ilFrmPostTitle {
margin-top: 24px;
color: #00876c;
font-weight: bold;
}
#ilFrmPostList {
list-style: none;
padding-left: 0;
}
li.ilFrmPostRow {
padding: 3px 0 3px 3px;
margin-bottom: 24px;
border-left: 6px solid #dddddd;
}
.ilFrmPostRow > div {
display: flex;
}
.ilFrmPostImage img {
margin: 0 !important;
padding: 6px 9px 9px 6px;
}
.ilUserIcon {
width: 115px;
}
.small {
text-decoration: none;
font-size: 0.75rem;
color: #6f6f6f;
}
</style>
</head>
<body>
{{heading}}
{{content}}
</body>
</html>
""".strip() # noqa: E501 line too long
def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str: def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str:
# Seems to be comments, ignore those. # Seems to be comments, ignore those.
@ -139,13 +221,13 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
</div> </div>
""" """
if prev and body.select_one(".ilc_page_lnav_LeftNavigation"): if prev and body.select_one(".ilc_page_lnav_LeftNavigation"):
text = body.select_one(".ilc_page_lnav_LeftNavigation").getText().strip() text = cast(bs4.Tag, body.select_one(".ilc_page_lnav_LeftNavigation")).get_text().strip()
left = f'<a href="{prev}">{text}</a>' left = f'<a href="{prev}">{text}</a>'
else: else:
left = "<span></span>" left = "<span></span>"
if next and body.select_one(".ilc_page_rnav_RightNavigation"): if next and body.select_one(".ilc_page_rnav_RightNavigation"):
text = body.select_one(".ilc_page_rnav_RightNavigation").getText().strip() text = cast(bs4.Tag, body.select_one(".ilc_page_rnav_RightNavigation")).get_text().strip()
right = f'<a href="{next}">{text}</a>' right = f'<a href="{next}">{text}</a>'
else: else:
right = "<span></span>" right = "<span></span>"
@ -160,8 +242,17 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
"{{left}}", left).replace("{{right}}", right).encode()) "{{left}}", left).replace("{{right}}", right).encode())
) )
body = body.prettify() body_str = cast(str, body.prettify())
return _learning_module_template.replace("{{body}}", body).replace("{{name}}", name) return _learning_module_template.replace("{{body}}", body_str).replace("{{name}}", name)
def forum_thread_template(name: str, url: str, heading: bs4.Tag, content: bs4.Tag) -> str:
if title := cast(Optional[bs4.Tag], heading.find(name="b")):
title.wrap(bs4.Tag(name="a", attrs={"href": url}))
return _forum_thread_template \
.replace("{{name}}", name) \
.replace("{{heading}}", cast(str, heading.prettify())) \
.replace("{{content}}", cast(str, content.prettify()))
class Links(Enum): class Links(Enum):

View File

@ -1,3 +1,5 @@
from typing import cast
from bs4 import BeautifulSoup, Comment, Tag from bs4 import BeautifulSoup, Comment, Tag
_STYLE_TAG_CONTENT = """ _STYLE_TAG_CONTENT = """
@ -12,6 +14,13 @@ _STYLE_TAG_CONTENT = """
font-weight: bold; font-weight: bold;
} }
.row-flex {
display: flex;
}
.row-flex-wrap {
flex-wrap: wrap;
}
.accordion-head { .accordion-head {
background-color: #f5f7fa; background-color: #f5f7fa;
padding: 0.5rem 0; padding: 0.5rem 0;
@ -63,18 +72,18 @@ def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup:
def clean(soup: BeautifulSoup) -> BeautifulSoup: def clean(soup: BeautifulSoup) -> BeautifulSoup:
for block in soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES): for block in cast(list[Tag], soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES)):
block.name = "article" block.name = "article"
for block in soup.find_all("h3"): for block in cast(list[Tag], soup.find_all("h3")):
block.name = "div" block.name = "div"
for block in soup.find_all("h1"): for block in cast(list[Tag], soup.find_all("h1")):
block.name = "h3" block.name = "h3"
for block in soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap"): for block in cast(list[Tag], soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap")):
block.name = "h3" block.name = "h3"
block["class"] += ["accordion-head"] block["class"] += ["accordion-head"] # type: ignore
for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"): for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"):
children = list(dummy.children) children = list(dummy.children)
@ -85,7 +94,12 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup:
if isinstance(type(children[0]), Comment): if isinstance(type(children[0]), Comment):
dummy.decompose() dummy.decompose()
for hrule_imposter in soup.find_all(class_="ilc_section_Separator"): # Delete video figures, as they can not be internalized anyway
for video in soup.select(".ilc_media_cont_MediaContainerHighlighted .ilPageVideo"):
if figure := video.find_parent("figure"):
figure.decompose()
for hrule_imposter in cast(list[Tag], soup.find_all(class_="ilc_section_Separator")):
hrule_imposter.insert(0, soup.new_tag("hr")) hrule_imposter.insert(0, soup.new_tag("hr"))
return soup return soup

View File

@ -19,14 +19,20 @@ from ...utils import fmt_path, soupify, url_set_query_param
from ..crawler import CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical from ..crawler import CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical
from ..http_crawler import HttpCrawler, HttpCrawlerSection from ..http_crawler import HttpCrawler, HttpCrawlerSection
from .async_helper import _iorepeat from .async_helper import _iorepeat
from .file_templates import Links, learning_module_template from .file_templates import Links, forum_thread_template, learning_module_template
from .ilias_html_cleaner import clean, insert_base_markup from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage, from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
IliasPageElement, _sanitize_path_name, parse_ilias_forum_export) IliasPageElement, IliasSoup, _sanitize_path_name, parse_ilias_forum_export)
from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int] TargetType = Union[str, int]
class LoginTypeLocal:
def __init__(self, client_id: str):
self.client_id = client_id
class IliasWebCrawlerSection(HttpCrawlerSection): class IliasWebCrawlerSection(HttpCrawlerSection):
def base_url(self) -> str: def base_url(self) -> str:
base_url = self.s.get("base_url") base_url = self.s.get("base_url")
@ -35,12 +41,30 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
return base_url return base_url
def client_id(self) -> str: def login(self) -> Union[Literal["shibboleth"], LoginTypeLocal]:
login_type = self.s.get("login_type")
if not login_type:
self.missing_value("login_type")
if login_type == "shibboleth":
return "shibboleth"
if login_type == "local":
client_id = self.s.get("client_id") client_id = self.s.get("client_id")
if not client_id: if not client_id:
self.missing_value("client_id") self.missing_value("client_id")
return LoginTypeLocal(client_id)
return client_id self.invalid_value("login_type", login_type, "Should be <shibboleth | local>")
def tfa_auth(
self, authenticators: Dict[str, Authenticator]
) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
auth = authenticators.get(value)
if auth is None:
self.invalid_value("tfa_auth", value, "No such auth section exists")
return auth
def target(self) -> TargetType: def target(self) -> TargetType:
target = self.s.get("target") target = self.s.get("target")
@ -85,19 +109,19 @@ _DIRECTORY_PAGES: Set[IliasElementType] = {
IliasElementType.EXERCISE_FILES, IliasElementType.EXERCISE_FILES,
IliasElementType.FOLDER, IliasElementType.FOLDER,
IliasElementType.INFO_TAB, IliasElementType.INFO_TAB,
IliasElementType.MEETING,
IliasElementType.MEDIACAST_VIDEO_FOLDER, IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.MEETING,
IliasElementType.OPENCAST_VIDEO_FOLDER, IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
} }
_VIDEO_ELEMENTS: Set[IliasElementType] = { _VIDEO_ELEMENTS: Set[IliasElementType] = {
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.MEDIACAST_VIDEO, IliasElementType.MEDIACAST_VIDEO,
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO, IliasElementType.OPENCAST_VIDEO,
IliasElementType.OPENCAST_VIDEO_PLAYER,
IliasElementType.OPENCAST_VIDEO_FOLDER, IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
IliasElementType.OPENCAST_VIDEO_PLAYER,
} }
@ -155,7 +179,13 @@ instance's greatest bottleneck.
self._auth = auth self._auth = auth
self._base_url = section.base_url() self._base_url = section.base_url()
self._client_id = section.client_id() self._tfa_auth = section.tfa_auth(authenticators)
self._login_type = section.login()
if isinstance(self._login_type, LoginTypeLocal):
self._client_id = self._login_type.client_id
else:
self._shibboleth_login = ShibbolethLogin(self._base_url, self._auth, self._tfa_auth)
self._target = section.target() self._target = section.target()
self._link_file_redirect_delay = section.link_redirect_delay() self._link_file_redirect_delay = section.link_redirect_delay()
@ -178,94 +208,43 @@ instance's greatest bottleneck.
async def _crawl_course(self, course_id: int) -> None: async def _crawl_course(self, course_id: int) -> None:
# Start crawling at the given course # Start crawling at the given course
root_url = url_set_query_param( root_url = url_set_query_param(
urljoin(self._base_url, "/goto.php"), urljoin(self._base_url + "/", "goto.php"),
"target", f"crs_{course_id}", "target", f"crs_{course_id}",
) )
await self._crawl_url(root_url, expected_id=course_id) await self._crawl_url(root_url, expected_id=course_id)
async def _crawl_desktop(self) -> None: async def _crawl_desktop(self) -> None:
appendix = r"ILIAS\Repository\Provider\RepositoryMainBarProvider|mm_pd_sel_items" await self._crawl_url(
appendix = appendix.encode("ASCII").hex() urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show")
await self._crawl_url(url_set_query_param( )
urljoin(self._base_url, "/gs_content.php"),
"item=", appendix,
))
async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None: async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None:
maybe_cl = await self.crawl(PurePath(".")) if awaitable := await self._handle_ilias_page(url, None, PurePath("."), expected_id):
if not maybe_cl: await awaitable
return
cl = maybe_cl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
description: List[BeautifulSoup] = []
@_iorepeat(3, "crawling url")
async def gather_elements() -> None:
elements.clear()
async with cl:
next_stage_url: Optional[str] = url
current_parent = None
# Duplicated code, but the root page is special - we want to avoid fetching it twice!
while next_stage_url:
soup = await self._get_page(next_stage_url, root_page_allowed=True)
if current_parent is None and expected_id is not None:
perma_link = IliasPage.get_soup_permalink(soup)
if not perma_link or "crs_" not in perma_link:
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
next_stage_url = next_element.url
else:
next_stage_url = None
elements.extend(page.get_child_elements())
if info_tab := page.get_info_tab():
elements.append(info_tab)
if description_string := page.get_description():
description.append(description_string)
# Fill up our task list with the found elements
await gather_elements()
if description:
await self._download_description(PurePath("."), description[0])
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(PurePath("."), element):
tasks.append(asyncio.create_task(handle))
# And execute them
await self.gather(tasks)
async def _handle_ilias_page( async def _handle_ilias_page(
self, self,
url: str, url: str,
parent: IliasPageElement, current_element: Optional[IliasPageElement],
path: PurePath, path: PurePath,
expected_course_id: Optional[int] = None,
) -> Optional[Coroutine[Any, Any, None]]: ) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(path) maybe_cl = await self.crawl(path)
if not maybe_cl: if not maybe_cl:
return None return None
return self._crawl_ilias_page(url, parent, maybe_cl) if current_element:
self._ensure_not_seen(current_element, path)
return self._crawl_ilias_page(url, current_element, maybe_cl, expected_course_id)
@anoncritical @anoncritical
async def _crawl_ilias_page( async def _crawl_ilias_page(
self, self,
url: str, url: str,
parent: IliasPageElement, current_element: Optional[IliasPageElement],
cl: CrawlToken, cl: CrawlToken,
expected_course_id: Optional[int] = None,
) -> None: ) -> None:
elements: List[IliasPageElement] = [] elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes # A list as variable redefinitions are not propagated to outer scopes
@ -276,19 +255,30 @@ instance's greatest bottleneck.
elements.clear() elements.clear()
async with cl: async with cl:
next_stage_url: Optional[str] = url next_stage_url: Optional[str] = url
current_parent = parent current_parent = current_element
page = None
while next_stage_url: while next_stage_url:
soup = await self._get_page(next_stage_url) soup = await self._get_page(next_stage_url)
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}") log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}") log.explain(f"URL: {next_stage_url}")
page = IliasPage(soup, next_stage_url, current_parent)
# If we expect to find a root course, enforce it
if current_parent is None and expected_course_id is not None:
perma_link = IliasPage.get_soup_permalink(soup)
if not perma_link or "crs/" not in perma_link:
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
if str(expected_course_id) not in perma_link:
raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}")
page = IliasPage(soup, current_parent)
if next_element := page.get_next_stage_element(): if next_element := page.get_next_stage_element():
current_parent = next_element current_parent = next_element
next_stage_url = next_element.url next_stage_url = next_element.url
else: else:
next_stage_url = None next_stage_url = None
page = cast(IliasPage, page)
elements.extend(page.get_child_elements()) elements.extend(page.get_child_elements())
if description_string := page.get_description(): if description_string := page.get_description():
description.append(description_string) description.append(description_string)
@ -320,14 +310,6 @@ instance's greatest bottleneck.
parent_path: PurePath, parent_path: PurePath,
element: IliasPageElement, element: IliasPageElement,
) -> Optional[Coroutine[Any, Any, None]]: ) -> Optional[Coroutine[Any, Any, None]]:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. "
+ f"First path: {fmt_path(self._visited_urls[element.url])}. "
+ f"Second path: {fmt_path(parent_path)}."
)
self._visited_urls[element.url] = parent_path
# element.name might contain `/` if the crawler created nested elements, # element.name might contain `/` if the crawler created nested elements,
# so we can not sanitize it here. We trust in the output dir to thwart worst-case # so we can not sanitize it here. We trust in the output dir to thwart worst-case
# directory escape attacks. # directory escape attacks.
@ -379,6 +361,54 @@ instance's greatest bottleneck.
"[bright_black](scorm learning modules are not supported)" "[bright_black](scorm learning modules are not supported)"
) )
return None return None
elif element.type == IliasElementType.LITERATURE_LIST:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](literature lists are not currently supported)"
)
return None
elif element.type == IliasElementType.LEARNING_MODULE_HTML:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](HTML learning modules are not supported)"
)
return None
elif element.type == IliasElementType.BLOG:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](blogs are not currently supported)"
)
return None
elif element.type == IliasElementType.DCL_RECORD_LIST:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](dcl record lists are not currently supported)"
)
return None
elif element.type == IliasElementType.MEDIA_POOL:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](media pools are not currently supported)"
)
return None
elif element.type == IliasElementType.COURSE:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](not descending into linked course, download it separately)"
)
return None
elif element.type == IliasElementType.LEARNING_MODULE: elif element.type == IliasElementType.LEARNING_MODULE:
return await self._handle_learning_module(element, element_path) return await self._handle_learning_module(element, element_path)
elif element.type == IliasElementType.LINK: elif element.type == IliasElementType.LINK:
@ -391,6 +421,8 @@ instance's greatest bottleneck.
return await self._handle_opencast_video(element, element_path) return await self._handle_opencast_video(element, element_path)
elif element.type == IliasElementType.MEDIACAST_VIDEO: elif element.type == IliasElementType.MEDIACAST_VIDEO:
return await self._handle_file(element, element_path) return await self._handle_file(element, element_path)
elif element.type == IliasElementType.MOB_VIDEO:
return await self._handle_file(element, element_path, is_video=True)
elif element.type in _DIRECTORY_PAGES: elif element.type in _DIRECTORY_PAGES:
return await self._handle_ilias_page(element.url, element, element_path) return await self._handle_ilias_page(element.url, element, element_path)
else: else:
@ -466,6 +498,8 @@ instance's greatest bottleneck.
if not maybe_dl: if not maybe_dl:
return None return None
self._ensure_not_seen(element, element_path)
return self._download_booking(element, link_template_maybe, maybe_dl) return self._download_booking(element, link_template_maybe, maybe_dl)
@anoncritical @anoncritical
@ -476,9 +510,10 @@ instance's greatest bottleneck.
if not dl: if not dl:
return return
async with dl as (bar, sink): async with dl as (_bar, sink):
description = clean(insert_base_markup(description)) description = clean(insert_base_markup(description))
sink.file.write(description.prettify().encode("utf-8")) description_tag = await self.internalize_images(description)
sink.file.write(cast(str, description_tag.prettify()).encode("utf-8"))
sink.done() sink.done()
@anoncritical @anoncritical
@ -493,17 +528,27 @@ instance's greatest bottleneck.
self._write_link_content(link_template, element.url, element.name, element.description, sink) self._write_link_content(link_template, element.url, element.name, element.description, sink)
async def _resolve_link_target(self, export_url: str) -> str: async def _resolve_link_target(self, export_url: str) -> str:
async def impl() -> Optional[str]:
async with self.session.get(export_url, allow_redirects=False) as resp: async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated # No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers: if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()).select_one("a").get("href").strip() return soupify(await resp.read()).select_one("a").get("href").strip() # type: ignore
# We are either unauthenticated or the link is not active
new_url = resp.headers[hdrs.LOCATION].lower()
if "baseclass=illinkresourcehandlergui" in new_url and "cmd=infoscreen" in new_url:
return ""
return None
await self._authenticate() auth_id = await self._current_auth_id()
target = await impl()
if target is not None:
return target
async with self.session.get(export_url, allow_redirects=False) as resp: await self.authenticate(auth_id)
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers: target = await impl()
return soupify(await resp.read()).select_one("a").get("href").strip() if target is not None:
return target
raise CrawlError("resolve_link_target failed even after authenticating") raise CrawlError("resolve_link_target failed even after authenticating")
@ -530,6 +575,8 @@ instance's greatest bottleneck.
if not maybe_dl: if not maybe_dl:
return None return None
self._ensure_not_seen(element, element_path)
# If we have every file from the cached mapping already, we can ignore this and bail # If we have every file from the cached mapping already, we can ignore this and bail
if self._all_opencast_videos_locally_present(element, maybe_dl.path): if self._all_opencast_videos_locally_present(element, maybe_dl.path):
# Mark all existing videos as known to ensure they do not get deleted during cleanup. # Mark all existing videos as known to ensure they do not get deleted during cleanup.
@ -590,7 +637,7 @@ instance's greatest bottleneck.
) )
async with dl as (bar, sink): async with dl as (bar, sink):
page = IliasPage(await self._get_page(element.url), element.url, element) page = IliasPage(await self._get_page(element.url), element)
stream_elements = page.get_child_elements() stream_elements = page.get_child_elements()
if len(stream_elements) > 1: if len(stream_elements) > 1:
@ -600,7 +647,7 @@ instance's greatest bottleneck.
stream_element = stream_elements[0] stream_element = stream_elements[0]
# We do not have a local cache yet # We do not have a local cache yet
await self._stream_from_url(stream_element.url, sink, bar, is_video=True) await self._stream_from_url(stream_element, sink, bar, is_video=True)
add_to_report([str(self._transformer.transform(dl.path))]) add_to_report([str(self._transformer.transform(dl.path))])
return return
@ -615,7 +662,7 @@ instance's greatest bottleneck.
async with maybe_dl as (bar, sink): async with maybe_dl as (bar, sink):
log.explain(f"Streaming video from real url {stream_element.url}") log.explain(f"Streaming video from real url {stream_element.url}")
contained_video_paths.append(str(self._transformer.transform(maybe_dl.path))) contained_video_paths.append(str(self._transformer.transform(maybe_dl.path)))
await self._stream_from_url(stream_element.url, sink, bar, is_video=True) await self._stream_from_url(stream_element, sink, bar, is_video=True)
add_to_report(contained_video_paths) add_to_report(contained_video_paths)
@ -623,23 +670,33 @@ instance's greatest bottleneck.
self, self,
element: IliasPageElement, element: IliasPageElement,
element_path: PurePath, element_path: PurePath,
is_video: bool = False,
) -> Optional[Coroutine[Any, Any, None]]: ) -> Optional[Coroutine[Any, Any, None]]:
maybe_dl = await self.download(element_path, mtime=element.mtime) maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl: if not maybe_dl:
return None return None
return self._download_file(element, maybe_dl) self._ensure_not_seen(element, element_path)
return self._download_file(element, maybe_dl, is_video)
@_iorepeat(3, "downloading file") @_iorepeat(3, "downloading file")
@anoncritical @anoncritical
async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None: async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None:
assert dl # The function is only reached when dl is not None assert dl # The function is only reached when dl is not None
async with dl as (bar, sink): async with dl as (bar, sink):
await self._stream_from_url(element.url, sink, bar, is_video=False) await self._stream_from_url(element, sink, bar, is_video)
async def _stream_from_url(
self,
element: IliasPageElement,
sink: FileSink,
bar: ProgressBar,
is_video: bool
) -> None:
url = element.url
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None:
async def try_stream() -> bool: async def try_stream() -> bool:
next_url = url next_url = url
# Normal files redirect to the magazine if we are not authenticated. As files could be HTML, # Normal files redirect to the magazine if we are not authenticated. As files could be HTML,
# we can not match on the content type here. Instead, we disallow redirects and inspect the # we can not match on the content type here. Instead, we disallow redirects and inspect the
# new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume # new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume
@ -663,6 +720,13 @@ instance's greatest bottleneck.
if is_video and "html" in resp.content_type: if is_video and "html" in resp.content_type:
return False return False
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range
if content_range := resp.headers.get(hdrs.CONTENT_RANGE, default=None):
parts = content_range.split("/")
if len(parts) == 2 and parts[1].isdigit():
bar.set_total(int(parts[1]))
# Prefer the content length header
if resp.content_length: if resp.content_length:
bar.set_total(resp.content_length) bar.set_total(resp.content_length)
@ -680,7 +744,7 @@ instance's greatest bottleneck.
await self.authenticate(auth_id) await self.authenticate(auth_id)
if not await try_stream(): if not await try_stream():
raise CrawlError("File streaming failed after authenticate()") raise CrawlError(f"File streaming failed after authenticate() {element!r}")
async def _handle_forum( async def _handle_forum(
self, self,
@ -699,32 +763,66 @@ instance's greatest bottleneck.
async with cl: async with cl:
next_stage_url = element.url next_stage_url = element.url
page = None
while next_stage_url: while next_stage_url:
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}") log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}") log.explain(f"URL: {next_stage_url}")
soup = await self._get_page(next_stage_url) soup = await self._get_page(next_stage_url)
page = IliasPage(soup, next_stage_url, element) page = IliasPage(soup, element)
if next := page.get_next_stage_element(): if next := page.get_next_stage_element():
next_stage_url = next.url next_stage_url = next.url
else: else:
break break
download_data = page.get_download_forum_data() forum_threads: list[tuple[IliasPageElement, bool]] = []
if not download_data: for entry in cast(IliasPage, page).get_forum_entries():
raise CrawlWarning("Failed to extract forum data") path = cl.path / (_sanitize_path_name(entry.name) + ".html")
if download_data.empty: forum_threads.append((entry, self.should_try_download(path, mtime=entry.mtime)))
# Sort the ids. The forum download will *preserve* this ordering
forum_threads.sort(key=lambda elem: elem[0].id())
if not forum_threads:
log.explain("Forum had no threads") log.explain("Forum had no threads")
return return
download_data = cast(IliasPage, page).get_download_forum_data(
[thread.id() for thread, download in forum_threads if download]
)
if not download_data:
raise CrawlWarning("Failed to extract forum data")
if not download_data.empty:
html = await self._post_authenticated(download_data.url, download_data.form_data) html = await self._post_authenticated(download_data.url, download_data.form_data)
elements = parse_ilias_forum_export(soupify(html)) elements = parse_ilias_forum_export(soupify(html))
else:
elements = []
elements.sort(key=lambda elem: elem.title) # Verify that ILIAS does not change the order, as we depend on it later. Otherwise, we could not call
# download in the correct order, potentially messing up duplication handling.
expected_element_titles = [thread.name for thread, download in forum_threads if download]
actual_element_titles = [_sanitize_path_name(thread.name) for thread in elements]
if expected_element_titles != actual_element_titles:
raise CrawlWarning(
f"Forum thread order mismatch: {expected_element_titles} != {actual_element_titles}"
)
tasks: List[Awaitable[None]] = [] tasks: List[Awaitable[None]] = []
for elem in elements: for thread, download in forum_threads:
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, elem))) if download:
# This only works because ILIAS keeps the order in the export
elem = elements.pop(0)
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, elem, thread)))
else:
# We only downloaded the threads we "should_try_download"ed. This can be an
# over-approximation and all will be fine.
# If we selected too few, e.g. because there was a duplicate title and the mtime of the
# original is newer than the update of the duplicate.
# This causes stale data locally, but I consider this problem acceptable right now.
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, thread)))
# And execute them # And execute them
await self.gather(tasks) await self.gather(tasks)
@ -734,17 +832,22 @@ instance's greatest bottleneck.
async def _download_forum_thread( async def _download_forum_thread(
self, self,
parent_path: PurePath, parent_path: PurePath,
element: IliasForumThread, thread: Union[IliasForumThread, IliasPageElement],
element: IliasPageElement
) -> None: ) -> None:
path = parent_path / (_sanitize_path_name(element.title) + ".html") path = parent_path / (_sanitize_path_name(thread.name) + ".html")
maybe_dl = await self.download(path, mtime=element.mtime) maybe_dl = await self.download(path, mtime=thread.mtime)
if not maybe_dl: if not maybe_dl or not isinstance(thread, IliasForumThread):
return return
async with maybe_dl as (bar, sink): async with maybe_dl as (bar, sink):
content = element.title_tag.prettify() rendered = forum_thread_template(
content += element.content_tag.prettify() thread.name,
sink.file.write(content.encode("utf-8")) element.url,
thread.name_tag,
await self.internalize_images(thread.content_tag)
)
sink.file.write(rendered.encode("utf-8"))
sink.done() sink.done()
async def _handle_learning_module( async def _handle_learning_module(
@ -755,6 +858,8 @@ instance's greatest bottleneck.
maybe_cl = await self.crawl(element_path) maybe_cl = await self.crawl(element_path)
if not maybe_cl: if not maybe_cl:
return None return None
self._ensure_not_seen(element, element_path)
return self._crawl_learning_module(element, maybe_cl) return self._crawl_learning_module(element, maybe_cl)
@_iorepeat(3, "crawling learning module") @_iorepeat(3, "crawling learning module")
@ -766,7 +871,7 @@ instance's greatest bottleneck.
log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}") log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {element.url}") log.explain(f"URL: {element.url}")
soup = await self._get_page(element.url) soup = await self._get_page(element.url)
page = IliasPage(soup, element.url, element) page = IliasPage(soup, element)
if next := page.get_learning_module_data(): if next := page.get_learning_module_data():
elements.extend(await self._crawl_learning_module_direction( elements.extend(await self._crawl_learning_module_direction(
cl.path, next.previous_url, "left", element cl.path, next.previous_url, "left", element
@ -809,7 +914,7 @@ instance's greatest bottleneck.
log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})") log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})")
log.explain(f"URL: {next_element_url}") log.explain(f"URL: {next_element_url}")
soup = await self._get_page(next_element_url) soup = await self._get_page(next_element_url)
page = IliasPage(soup, next_element_url, parent_element) page = IliasPage(soup, parent_element)
if next := page.get_learning_module_data(): if next := page.get_learning_module_data():
elements.append(next) elements.append(next)
if dir == "left": if dir == "left":
@ -840,13 +945,13 @@ instance's greatest bottleneck.
if prev: if prev:
prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html")) prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html"))
if prev_p: if prev_p:
prev = os.path.relpath(prev_p, my_path.parent) prev = cast(str, os.path.relpath(prev_p, my_path.parent))
else: else:
prev = None prev = None
if next: if next:
next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html")) next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html"))
if next_p: if next_p:
next = os.path.relpath(next_p, my_path.parent) next = cast(str, os.path.relpath(next_p, my_path.parent))
else: else:
next = None next = None
@ -866,21 +971,30 @@ instance's greatest bottleneck.
continue continue
if elem.name == "img": if elem.name == "img":
if src := elem.attrs.get("src", None): if src := elem.attrs.get("src", None):
url = urljoin(self._base_url, src) url = urljoin(self._base_url, cast(str, src))
if not url.startswith(self._base_url): if not url.startswith(self._base_url):
continue continue
log.explain(f"Internalizing {url!r}") log.explain(f"Internalizing {url!r}")
img = await self._get_authenticated(url) img = await self._get_authenticated(url)
elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode() elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode()
if elem.name == "iframe" and elem.attrs.get("src", "").startswith("//"): if elem.name == "iframe" and cast(str, elem.attrs.get("src", "")).startswith("//"):
# For unknown reasons the protocol seems to be stripped. # For unknown reasons the protocol seems to be stripped.
elem.attrs["src"] = "https:" + elem.attrs["src"] elem.attrs["src"] = "https:" + cast(str, elem.attrs["src"])
return tag return tag
async def _get_page(self, url: str, root_page_allowed: bool = False) -> BeautifulSoup: def _ensure_not_seen(self, element: IliasPageElement, parent_path: PurePath) -> None:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. "
+ f"First path: {fmt_path(self._visited_urls[element.url])}. "
+ f"Second path: {fmt_path(parent_path)}."
)
self._visited_urls[element.url] = parent_path
async def _get_page(self, url: str, root_page_allowed: bool = False) -> IliasSoup:
auth_id = await self._current_auth_id() auth_id = await self._current_auth_id()
async with self.session.get(url) as request: async with self.session.get(url) as request:
soup = soupify(await request.read()) soup = IliasSoup(soupify(await request.read()), str(request.url))
if IliasPage.is_logged_in(soup): if IliasPage.is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed) return self._verify_page(soup, url, root_page_allowed)
@ -889,13 +1003,13 @@ instance's greatest bottleneck.
# Retry once after authenticating. If this fails, we will die. # Retry once after authenticating. If this fails, we will die.
async with self.session.get(url) as request: async with self.session.get(url) as request:
soup = soupify(await request.read()) soup = IliasSoup(soupify(await request.read()), str(request.url))
if IliasPage.is_logged_in(soup): if IliasPage.is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed) return self._verify_page(soup, url, root_page_allowed)
raise CrawlError(f"get_page failed even after authenticating on {url!r}") raise CrawlError(f"get_page failed even after authenticating on {url!r}")
@staticmethod @staticmethod
def _verify_page(soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup: def _verify_page(soup: IliasSoup, url: str, root_page_allowed: bool) -> IliasSoup:
if IliasPage.is_root_page(soup) and not root_page_allowed: if IliasPage.is_root_page(soup) and not root_page_allowed:
raise CrawlError( raise CrawlError(
"Unexpectedly encountered ILIAS root page. " "Unexpectedly encountered ILIAS root page. "
@ -947,10 +1061,11 @@ instance's greatest bottleneck.
return await request.read() return await request.read()
raise CrawlError("get_authenticated failed even after authenticating") raise CrawlError("get_authenticated failed even after authenticating")
# ToDo: Is iorepeat still required?
@_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None: async def _authenticate(self) -> None:
# fill the session with the correct cookies # fill the session with the correct cookies
if self._login_type == "shibboleth":
await self._shibboleth_login.login(self.session)
else:
params = { params = {
"client_id": self._client_id, "client_id": self._client_id,
"cmd": "force_login", "cmd": "force_login",
@ -958,11 +1073,11 @@ instance's greatest bottleneck.
async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request: async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request:
login_page = soupify(await request.read()) login_page = soupify(await request.read())
login_form = login_page.find("form", attrs={"name": "formlogin"}) login_form = cast(Optional[Tag], login_page.find("form", attrs={"name": "formlogin"}))
if login_form is None: if login_form is None:
raise CrawlError("Could not find the login form! Specified client id might be invalid.") raise CrawlError("Could not find the login form! Specified client id might be invalid.")
login_url = login_form.attrs.get("action") login_url = cast(Optional[str], login_form.attrs.get("action"))
if login_url is None: if login_url is None:
raise CrawlError("Could not find the action URL in the login form!") raise CrawlError("Could not find the action URL in the login form!")
@ -976,34 +1091,6 @@ instance's greatest bottleneck.
# do the actual login # do the actual login
async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request: async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request:
soup = soupify(await request.read()) soup = IliasSoup(soupify(await request.read()), str(request.url))
if not self._is_logged_in(soup): if not IliasPage.is_logged_in(soup):
self._auth.invalidate_credentials() self._auth.invalidate_credentials()
@staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False

File diff suppressed because it is too large Load Diff

View File

@ -1,23 +1,14 @@
from typing import Any, Dict, Optional, Union from typing import Dict, Literal
import aiohttp from ...auth import Authenticator
import yarl
from bs4 import BeautifulSoup
from ...auth import Authenticator, TfaAuthenticator
from ...config import Config from ...config import Config
from ...logging import log
from ...utils import soupify
from ..crawler import CrawlError, CrawlWarning
from .async_helper import _iorepeat
from .ilias_web_crawler import IliasWebCrawler, IliasWebCrawlerSection from .ilias_web_crawler import IliasWebCrawler, IliasWebCrawlerSection
from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int]
_ILIAS_URL = "https://ilias.studium.kit.edu" _ILIAS_URL = "https://ilias.studium.kit.edu"
class KitShibbolethBackgroundLoginSuccessful(): class KitShibbolethBackgroundLoginSuccessful:
pass pass
@ -25,19 +16,8 @@ class KitIliasWebCrawlerSection(IliasWebCrawlerSection):
def base_url(self) -> str: def base_url(self) -> str:
return _ILIAS_URL return _ILIAS_URL
def client_id(self) -> str: def login(self) -> Literal["shibboleth"]:
# KIT ILIAS uses the Shibboleth service for authentication. There's no return "shibboleth"
# use for a client id.
return "unused"
def tfa_auth(self, authenticators: Dict[str, Authenticator]) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
auth = authenticators.get(value)
if auth is None:
self.invalid_value("tfa_auth", value, "No such auth section exists")
return auth
class KitIliasWebCrawler(IliasWebCrawler): class KitIliasWebCrawler(IliasWebCrawler):
@ -46,184 +26,12 @@ class KitIliasWebCrawler(IliasWebCrawler):
name: str, name: str,
section: KitIliasWebCrawlerSection, section: KitIliasWebCrawlerSection,
config: Config, config: Config,
authenticators: Dict[str, Authenticator] authenticators: Dict[str, Authenticator],
): ):
super().__init__(name, section, config, authenticators) super().__init__(name, section, config, authenticators)
self._shibboleth_login = KitShibbolethLogin( self._shibboleth_login = ShibbolethLogin(
_ILIAS_URL,
self._auth, self._auth,
section.tfa_auth(authenticators), section.tfa_auth(authenticators),
) )
# We repeat this as the login method in shibboleth doesn't handle I/O errors.
# Shibboleth is quite reliable as well, the repeat is likely not critical here.
@_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session)
class KitShibbolethLogin:
"""
Login via KIT's shibboleth system.
"""
def __init__(self, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]) -> None:
self._auth = authenticator
self._tfa_auth = tfa_authenticator
async def login(self, sess: aiohttp.ClientSession) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
url = f"{_ILIAS_URL}/shib_login.php"
data = {
"sendLogin": "1",
"idp_selection": "https://idp.scc.kit.edu/idp/shibboleth",
"il_target": "",
"home_organization_selection": "Weiter",
}
soup: Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful] = await _shib_post(sess, url, data)
if isinstance(soup, KitShibbolethBackgroundLoginSuccessful):
return
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"class": "full content", "method": "post"})
action = form["action"]
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = "https://idp.scc.kit.edu" + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
"csrf_token": csrf_token
}
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup)
if not self._login_successful(soup):
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = f"{_ILIAS_URL}/Shibboleth.sso/SAML2/POST"
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
await sess.post(url, data=data)
async def _authenticate_tfa(
self,
session: aiohttp.ClientSession,
soup: BeautifulSoup
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
tfa_token = await self._tfa_auth.password()
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = "https://idp.scc.kit.edu" + action
data = {
"_eventId_proceed": "",
"j_tokenNumber": tfa_token,
"csrf_token": csrf_token
}
return await _post(session, url, data)
@staticmethod
def _login_successful(soup: BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="j_tokenNumber") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())
async def _shib_post(
session: aiohttp.ClientSession,
url: str,
data: Any
) -> Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful]:
"""
aiohttp unescapes '/' and ':' in URL query parameters which is not RFC compliant and rejected
by Shibboleth. Thanks a lot. So now we unroll the requests manually, parse location headers and
build encoded URL objects ourselves... Who thought mangling location header was a good idea??
"""
log.explain_topic("Shib login POST")
async with session.post(url, data=data, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Got location {location!r}")
if not location:
raise CrawlWarning(f"Login failed (1), no location header present at {url}")
correct_url = yarl.URL(location, encoded=True)
log.explain(f"Corrected location to {correct_url!r}")
if str(correct_url).startswith(_ILIAS_URL):
log.explain("ILIAS recognized our shib token and logged us in in the background, returning")
return KitShibbolethBackgroundLoginSuccessful()
async with session.get(correct_url, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Redirected to {location!r} with status {response.status}")
# If shib still has a valid session, it will directly respond to the request
if location is None:
log.explain("Shib recognized us, returning its response directly")
return soupify(await response.read())
as_yarl = yarl.URL(response.url)
# Probably not needed anymore, but might catch a few weird situations with a nicer message
if not location or not as_yarl.host:
raise CrawlWarning(f"Login failed (2), no location header present at {correct_url}")
correct_url = yarl.URL.build(
scheme=as_yarl.scheme,
host=as_yarl.host,
path=location,
encoded=True
)
log.explain(f"Corrected location to {correct_url!r}")
async with session.get(correct_url, allow_redirects=False) as response:
return soupify(await response.read())

View File

@ -0,0 +1,129 @@
from typing import Any, Optional, cast
import aiohttp
import yarl
from bs4 import BeautifulSoup, Tag
from ...auth import Authenticator, TfaAuthenticator
from ...logging import log
from ...utils import soupify
from ..crawler import CrawlError
class ShibbolethLogin:
"""
Login via shibboleth system.
"""
def __init__(
self, ilias_url: str, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]
) -> None:
self._ilias_url = ilias_url
self._auth = authenticator
self._tfa_auth = tfa_authenticator
async def login(self, sess: aiohttp.ClientSession) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
url = f"{self._ilias_url}/shib_login.php"
async with sess.get(url) as response:
shib_url = response.url
if str(shib_url).startswith(self._ilias_url):
log.explain(
"ILIAS recognized our shib token and logged us in in the background, returning"
)
return
soup: BeautifulSoup = soupify(await response.read())
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = cast(Tag, soup.find("form", {"method": "post"}))
action = cast(str, form["action"])
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
"fudis_web_authn_assertion_input": "",
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] # type: ignore
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup, shib_url)
if not self._login_successful(soup):
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = cast(Tag, soup.find("input", {"name": "RelayState"}))
saml_response = cast(Tag, soup.find("input", {"name": "SAMLResponse"}))
url = form = soup.find("form", {"method": "post"})["action"] # type: ignore
data = { # using the info obtained in the while loop above
"RelayState": cast(str, relay_state["value"]),
"SAMLResponse": cast(str, saml_response["value"]),
}
await sess.post(cast(str, url), data=data)
async def _authenticate_tfa(
self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
tfa_token = await self._tfa_auth.password()
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = cast(Tag, soup.find("form", {"method": "post"}))
action = cast(str, form["action"])
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"fudis_otp_input": tfa_token,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] # type: ignore
return await _post(session, url, data)
@staticmethod
def _login_successful(soup: BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="fudiscr-form") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())

View File

@ -1,8 +1,9 @@
import os import os
import re import re
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime
from pathlib import PurePath from pathlib import PurePath
from typing import Awaitable, List, Optional, Pattern, Set, Tuple, Union from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union, cast
from urllib.parse import urljoin from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
@ -31,24 +32,24 @@ class KitIpdCrawlerSection(HttpCrawlerSection):
return re.compile(regex) return re.compile(regex)
@dataclass(unsafe_hash=True) @dataclass
class KitIpdFile: class KitIpdFile:
name: str name: str
url: str url: str
def explain(self) -> None:
log.explain(f"File {self.name!r} (href={self.url!r})")
@dataclass @dataclass
class KitIpdFolder: class KitIpdFolder:
name: str name: str
files: List[KitIpdFile] entries: List[Union[KitIpdFile, "KitIpdFolder"]]
def explain(self) -> None: def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}") log.explain_topic(f"Folder {self.name!r}")
for file in self.files: for entry in self.entries:
log.explain(f"File {file.name!r} (href={file.url!r})") entry.explain()
def __hash__(self) -> int:
return self.name.__hash__()
class KitIpdCrawler(HttpCrawler): class KitIpdCrawler(HttpCrawler):
@ -72,81 +73,96 @@ class KitIpdCrawler(HttpCrawler):
async with maybe_cl: async with maybe_cl:
for item in await self._fetch_items(): for item in await self._fetch_items():
item.explain()
if isinstance(item, KitIpdFolder): if isinstance(item, KitIpdFolder):
tasks.append(self._crawl_folder(item)) tasks.append(self._crawl_folder(PurePath("."), item))
else: else:
# Orphan files are placed in the root folder log.explain_topic(f"Orphan file {item.name!r} (href={item.url!r})")
tasks.append(self._download_file(PurePath("."), item)) log.explain("Attributing it to root folder")
# do this here to at least be sequential and not parallel (rate limiting is hard, as the
# crawl abstraction does not hold for these requests)
etag, mtime = await self._request_resource_version(item.url)
tasks.append(self._download_file(PurePath("."), item, etag, mtime))
await self.gather(tasks) await self.gather(tasks)
async def _crawl_folder(self, folder: KitIpdFolder) -> None: async def _crawl_folder(self, parent: PurePath, folder: KitIpdFolder) -> None:
path = PurePath(folder.name) path = parent / folder.name
if not await self.crawl(path): if not await self.crawl(path):
return return
tasks = [self._download_file(path, file) for file in folder.files] tasks = []
for entry in folder.entries:
if isinstance(entry, KitIpdFolder):
tasks.append(self._crawl_folder(path, entry))
else:
# do this here to at least be sequential and not parallel (rate limiting is hard, as the crawl
# abstraction does not hold for these requests)
etag, mtime = await self._request_resource_version(entry.url)
tasks.append(self._download_file(path, entry, etag, mtime))
await self.gather(tasks) await self.gather(tasks)
async def _download_file(self, parent: PurePath, file: KitIpdFile) -> None: async def _download_file(
self,
parent: PurePath,
file: KitIpdFile,
etag: Optional[str],
mtime: Optional[datetime]
) -> None:
element_path = parent / file.name element_path = parent / file.name
maybe_dl = await self.download(element_path)
prev_etag = self._get_previous_etag_from_report(element_path)
etag_differs = None if prev_etag is None else prev_etag != etag
maybe_dl = await self.download(element_path, etag_differs=etag_differs, mtime=mtime)
if not maybe_dl: if not maybe_dl:
# keep storing the known file's etag
if prev_etag:
self._add_etag_to_report(element_path, prev_etag)
return return
async with maybe_dl as (bar, sink): async with maybe_dl as (bar, sink):
await self._stream_from_url(file.url, sink, bar) await self._stream_from_url(file.url, element_path, sink, bar)
async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]: async def _fetch_items(self) -> Iterable[Union[KitIpdFile, KitIpdFolder]]:
page, url = await self.get_page() page, url = await self.get_page()
elements: List[Tag] = self._find_file_links(page) elements: List[Tag] = self._find_file_links(page)
items: Set[Union[KitIpdFile, KitIpdFolder]] = set()
# do not add unnecessary nesting for a single <h1> heading
drop_h1: bool = len(page.find_all(name="h1")) <= 1
folder_tree: KitIpdFolder = KitIpdFolder(".", [])
for element in elements: for element in elements:
folder_label = self._find_folder_label(element) parent = HttpCrawler.get_folder_structure_from_heading_hierarchy(element, drop_h1)
if folder_label:
folder = self._extract_folder(folder_label, url)
if folder not in items:
items.add(folder)
folder.explain()
else:
file = self._extract_file(element, url) file = self._extract_file(element, url)
items.add(file)
log.explain_topic(f"Orphan file {file.name!r} (href={file.url!r})")
log.explain("Attributing it to root folder")
return items current_folder: KitIpdFolder = folder_tree
for folder_name in parent.parts:
# helps the type checker to verify that current_folder is indeed a folder
def subfolders() -> Generator[KitIpdFolder, Any, None]:
return (entry for entry in current_folder.entries if isinstance(entry, KitIpdFolder))
def _extract_folder(self, folder_tag: Tag, url: str) -> KitIpdFolder: if not any(entry.name == folder_name for entry in subfolders()):
files: List[KitIpdFile] = [] current_folder.entries.append(KitIpdFolder(folder_name, []))
name = folder_tag.getText().strip() current_folder = next(entry for entry in subfolders() if entry.name == folder_name)
container: Tag = folder_tag.findNextSibling(name="table") current_folder.entries.append(file)
for link in self._find_file_links(container):
files.append(self._extract_file(link, url))
return KitIpdFolder(name, files) return folder_tree.entries
@staticmethod
def _find_folder_label(file_link: Tag) -> Optional[Tag]:
enclosing_table: Tag = file_link.findParent(name="table")
if enclosing_table is None:
return None
return enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$"))
def _extract_file(self, link: Tag, url: str) -> KitIpdFile: def _extract_file(self, link: Tag, url: str) -> KitIpdFile:
url = self._abs_url_from_link(url, link) url = self._abs_url_from_link(url, link)
name = os.path.basename(url) name = os.path.basename(url)
return KitIpdFile(name, url) return KitIpdFile(name, url)
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> List[Tag]: def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> list[Tag]:
return tag.findAll(name="a", attrs={"href": self._file_regex}) return cast(list[Tag], tag.find_all(name="a", attrs={"href": self._file_regex}))
def _abs_url_from_link(self, url: str, link_tag: Tag) -> str: def _abs_url_from_link(self, url: str, link_tag: Tag) -> str:
return urljoin(url, link_tag.get("href")) return urljoin(url, cast(str, link_tag.get("href")))
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar) -> None: async def _stream_from_url(self, url: str, path: PurePath, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp: async with self.session.get(url, allow_redirects=False) as resp:
if resp.status == 403: if resp.status == 403:
raise CrawlError("Received a 403. Are you within the KIT network/VPN?") raise CrawlError("Received a 403. Are you within the KIT network/VPN?")
@ -159,6 +175,8 @@ class KitIpdCrawler(HttpCrawler):
sink.done() sink.done()
self._add_etag_to_report(path, resp.headers.get("ETag"))
async def get_page(self) -> Tuple[BeautifulSoup, str]: async def get_page(self) -> Tuple[BeautifulSoup, str]:
async with self.session.get(self._url) as request: async with self.session.get(self._url) as request:
# The web page for Algorithmen für Routenplanung contains some # The web page for Algorithmen für Routenplanung contains some

View File

@ -1,9 +1,8 @@
import asyncio import asyncio
import sys import sys
import traceback import traceback
from contextlib import asynccontextmanager, contextmanager from contextlib import AbstractContextManager, asynccontextmanager, contextmanager
# TODO In Python 3.9 and above, ContextManager is deprecated from typing import AsyncIterator, Iterator, List, Optional
from typing import AsyncIterator, ContextManager, Iterator, List, Optional
from rich.console import Console, Group from rich.console import Console, Group
from rich.live import Live from rich.live import Live
@ -261,7 +260,7 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
action: str, action: str,
text: str, text: str,
total: Optional[float] = None, total: Optional[float] = None,
) -> ContextManager[ProgressBar]: ) -> AbstractContextManager[ProgressBar]:
""" """
Allows markup in the "style" argument which will be applied to the Allows markup in the "style" argument which will be applied to the
"action" string. "action" string.
@ -277,7 +276,7 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
action: str, action: str,
text: str, text: str,
total: Optional[float] = None, total: Optional[float] = None,
) -> ContextManager[ProgressBar]: ) -> AbstractContextManager[ProgressBar]:
""" """
Allows markup in the "style" argument which will be applied to the Allows markup in the "style" argument which will be applied to the
"action" string. "action" string.

View File

@ -57,6 +57,7 @@ class OnConflict(Enum):
@dataclass @dataclass
class Heuristics: class Heuristics:
etag_differs: Optional[bool]
mtime: Optional[datetime] mtime: Optional[datetime]
@ -233,8 +234,16 @@ class OutputDirectory:
remote_newer = None remote_newer = None
# ETag should be a more reliable indicator than mtime, so we check it first
if heuristics.etag_differs is not None:
remote_newer = heuristics.etag_differs
if remote_newer:
log.explain("Remote file's entity tag differs")
else:
log.explain("Remote file's entity tag is the same")
# Python on Windows crashes when faced with timestamps around the unix epoch # Python on Windows crashes when faced with timestamps around the unix epoch
if heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970): if remote_newer is None and heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970):
mtime = heuristics.mtime mtime = heuristics.mtime
remote_newer = mtime.timestamp() > stat.st_mtime remote_newer = mtime.timestamp() > stat.st_mtime
if remote_newer: if remote_newer:
@ -362,10 +371,28 @@ class OutputDirectory:
raise OutputDirError("Failed to create temporary file") raise OutputDirError("Failed to create temporary file")
def should_try_download(
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
) -> bool:
heuristics = Heuristics(etag_differs, mtime)
redownload = self._redownload if redownload is None else redownload
on_conflict = self._on_conflict if on_conflict is None else on_conflict
local_path = self.resolve(path)
return self._should_download(local_path, heuristics, redownload, on_conflict)
async def download( async def download(
self, self,
remote_path: PurePath, remote_path: PurePath,
path: PurePath, path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None, mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None, redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None, on_conflict: Optional[OnConflict] = None,
@ -375,7 +402,7 @@ class OutputDirectory:
MarkConflictError. MarkConflictError.
""" """
heuristics = Heuristics(mtime) heuristics = Heuristics(etag_differs, mtime)
redownload = self._redownload if redownload is None else redownload redownload = self._redownload if redownload is None else redownload
on_conflict = self._on_conflict if on_conflict is None else on_conflict on_conflict = self._on_conflict if on_conflict is None else on_conflict
local_path = self.resolve(path) local_path = self.resolve(path)

View File

@ -1,4 +1,4 @@
from pathlib import Path from pathlib import Path, PurePath
from typing import Dict, List, Optional from typing import Dict, List, Optional
from rich.markup import escape from rich.markup import escape
@ -168,19 +168,24 @@ class Pferd:
log.report("") log.report("")
log.report(f"[bold bright_cyan]Report[/] for {escape(name)}") log.report(f"[bold bright_cyan]Report[/] for {escape(name)}")
def fmt_path_link(relative_path: PurePath) -> str:
# We need to URL-encode the path because it might contain spaces or special characters
link = crawler.output_dir.resolve(relative_path).absolute().as_uri()
return f"[link={link}]{fmt_path(relative_path)}[/link]"
something_changed = False something_changed = False
for path in sorted(crawler.report.added_files): for path in sorted(crawler.report.added_files):
something_changed = True something_changed = True
log.report(f" [bold bright_green]Added[/] {fmt_path(path)}") log.report(f" [bold bright_green]Added[/] {fmt_path_link(path)}")
for path in sorted(crawler.report.changed_files): for path in sorted(crawler.report.changed_files):
something_changed = True something_changed = True
log.report(f" [bold bright_yellow]Changed[/] {fmt_path(path)}") log.report(f" [bold bright_yellow]Changed[/] {fmt_path_link(path)}")
for path in sorted(crawler.report.deleted_files): for path in sorted(crawler.report.deleted_files):
something_changed = True something_changed = True
log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}") log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}")
for path in sorted(crawler.report.not_deleted_files): for path in sorted(crawler.report.not_deleted_files):
something_changed = True something_changed = True
log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}") log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path_link(path)}")
for warning in crawler.report.encountered_warnings: for warning in crawler.report.encountered_warnings:
something_changed = True something_changed = True

View File

@ -34,15 +34,6 @@ class MarkConflictError(Exception):
self.collides_with = collides_with self.collides_with = collides_with
# TODO Use PurePath.is_relative_to when updating to 3.9
def is_relative_to(a: PurePath, b: PurePath) -> bool:
try:
a.relative_to(b)
return True
except ValueError:
return False
class Report: class Report:
""" """
A report of a synchronization. Includes all files found by the crawler, as A report of a synchronization. Includes all files found by the crawler, as
@ -173,7 +164,7 @@ class Report:
if path == other: if path == other:
raise MarkDuplicateError(path) raise MarkDuplicateError(path)
if is_relative_to(path, other) or is_relative_to(other, path): if path.is_relative_to(other) or other.is_relative_to(path):
raise MarkConflictError(path, other) raise MarkConflictError(path, other)
self.known_files.add(path) self.known_files.add(path)

View File

@ -1,2 +1,2 @@
NAME = "PFERD" NAME = "PFERD"
VERSION = "3.6.0" VERSION = "3.8.0"

View File

@ -17,7 +17,7 @@ Binaries for Linux, Windows and Mac can be downloaded directly from the
### With pip ### With pip
Ensure you have at least Python 3.9 installed. Run the following command to Ensure you have at least Python 3.11 installed. Run the following command to
install PFERD or upgrade it to the latest version: install PFERD or upgrade it to the latest version:
``` ```

8
flake.lock generated
View File

@ -2,16 +2,16 @@
"nodes": { "nodes": {
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1708979614, "lastModified": 1744440957,
"narHash": "sha256-FWLWmYojIg6TeqxSnHkKpHu5SGnFP5um1uUjH+wRV6g=", "narHash": "sha256-FHlSkNqFmPxPJvy+6fNLaNeWnF1lZSgqVCl/eWaJRc4=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "b7ee09cf5614b02d289cd86fcfa6f24d4e078c2a", "rev": "26d499fc9f1d567283d5d56fcf367edd815dba1d",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "nixos-23.11", "ref": "nixos-24.11",
"repo": "nixpkgs", "repo": "nixpkgs",
"type": "github" "type": "github"
} }

View File

@ -2,7 +2,7 @@
description = "Tool for downloading course-related files from ILIAS"; description = "Tool for downloading course-related files from ILIAS";
inputs = { inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11"; nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.11";
}; };
outputs = { self, nixpkgs }: outputs = { self, nixpkgs }:

View File

@ -12,7 +12,7 @@ dependencies = [
"certifi>=2021.10.8" "certifi>=2021.10.8"
] ]
dynamic = ["version"] dynamic = ["version"]
requires-python = ">=3.9" requires-python = ">=3.11"
[project.scripts] [project.scripts]
pferd = "PFERD.__main__:main" pferd = "PFERD.__main__:main"