Compare commits

..

17 Commits

Author SHA1 Message Date
4af02012bc Strip long path prefix from file links in report 2024-11-14 20:06:13 +01:00
287173b0b1 Bump version to 3.7.0 2024-11-13 20:38:27 +01:00
712217e959 Handle groups in cards 2024-11-11 12:53:08 +01:00
6dda4c55a8 Add doctype header to forum threads
This should fix mimetype detection on most systems and is more relevant
now that the report is clickable
2024-11-05 18:36:21 +01:00
596b6a7688 Add support for non-KIT shibboleth login (#98)
Co-authored-by: Mr-Pine <git@mr-pine.de>
Co-authored-by: I-Al-Istannen <I-Al-Istannen@users.noreply.github.com>
2024-11-05 18:30:34 +01:00
Tim
5983200247 Treat headings as folders in kit-ipd crawler (#99) 2024-11-04 23:53:48 +01:00
Tim
26e802d88b Add clickable links to file names in the printed report (#100)
Co-authored-by: I-Al-Istannen <i-al-istannen@users.noreply.github.com>
2024-11-04 00:32:32 +01:00
f5c4e82816 Delay ilias loop detection after transform
This allows users to filter out duplicated elements and suppress the
warning.
2024-11-02 22:46:51 +01:00
f5273f7ca0 Collapse ilias url crawling into normal page crawling 2024-11-02 22:46:51 +01:00
fa71a9f44f Add support for mob videos in page descriptions 2024-10-28 20:35:30 +01:00
81d6ff53c4 Respect row flex in descriptions 2024-10-28 19:41:03 +01:00
d7a2b6e019 Delete videos from course descriptions 2024-10-28 19:41:03 +01:00
71c65e89d1 Internalize images in course descriptions 2024-10-28 19:41:03 +01:00
c1046498e7 Fix download of links without a target URL
They are now downloaded as links to the empty url.
2024-10-28 19:41:03 +01:00
8fbd1978af Fix crawling of nested courses 2024-10-28 18:52:27 +01:00
Tim
739dd95850 Use Last-Modified and ETag headers to determine KIT-IPD file versions (#95)
Co-authored-by: I-Al-Istannen <i-al-istannen@users.noreply.github.com>
2024-10-27 19:03:47 +01:00
c54c3bcfa1 Fix crawling of favorites 2024-10-27 10:50:59 +01:00
15 changed files with 569 additions and 397 deletions

View File

@ -22,6 +22,27 @@ ambiguous situations.
## Unreleased ## Unreleased
## 3.7.0 - 2024-11-13
### Added
- Support for MOB videos in page descriptions
- Clickable links in the report to directly open new/modified/not-deleted files
- Support for non KIT shibboleth login
### Changed
- Remove videos from description pages
- Perform ILIAS cycle detection after processing the transform to allow
ignoring duplicated elements
- Parse headings (h1-h3) as folders in kit-ipd crawler
### Fixed
- Personal desktop/dashboard/favorites crawling
- Crawling of nested courses
- Downloading of links with no target URL
- Handle row flex on description pages
- Add `<!DOCTYPE html>` heading to forum threads to fix mime type detection
- Handle groups in cards
## 3.6.0 - 2024-10-23 ## 3.6.0 - 2024-10-23
### Added ### Added

View File

@ -163,12 +163,13 @@ out of the box for the corresponding universities:
[ilias-dl]: https://github.com/V3lop5/ilias-downloader/blob/main/configs "ilias-downloader configs" [ilias-dl]: https://github.com/V3lop5/ilias-downloader/blob/main/configs "ilias-downloader configs"
| University | `base_url` | `client_id` | | University | `base_url` | `login_type` | `client_id` |
|---------------|--------------------------------------|---------------| |---------------|-----------------------------------------|--------------|---------------|
| FH Aachen | https://www.ili.fh-aachen.de | elearning | | FH Aachen | https://www.ili.fh-aachen.de | local | elearning |
| Uni Köln | https://www.ilias.uni-koeln.de/ilias | uk | | Uni Köln | https://www.ilias.uni-koeln.de/ilias | local | uk |
| Uni Konstanz | https://ilias.uni-konstanz.de | ILIASKONSTANZ | | Uni Konstanz | https://ilias.uni-konstanz.de | local | ILIASKONSTANZ |
| Uni Stuttgart | https://ilias3.uni-stuttgart.de | Uni_Stuttgart | | Uni Stuttgart | https://ilias3.uni-stuttgart.de | local | Uni_Stuttgart |
| Uni Tübingen | https://ovidius.uni-tuebingen.de/ilias3 | shibboleth | |
If your university isn't listed, try navigating to your instance's login page. If your university isn't listed, try navigating to your instance's login page.
Assuming no custom login service is used, the URL will look something like this: Assuming no custom login service is used, the URL will look something like this:
@ -180,7 +181,11 @@ Assuming no custom login service is used, the URL will look something like this:
If the values work, feel free to submit a PR and add them to the table above. If the values work, feel free to submit a PR and add them to the table above.
- `base_url`: The URL where the ILIAS instance is located. (Required) - `base_url`: The URL where the ILIAS instance is located. (Required)
- `client_id`: An ID used for authentication. (Required) - `login_type`: How you authenticate. (Required)
- `local`: Use `client_id` for authentication.
- `shibboleth`: Use shibboleth for authentication.
- `client_id`: An ID used for authentication if `login_type` is `local`. Is
ignored if `login_type` is `shibboleth`.
- `target`: The ILIAS element to crawl. (Required) - `target`: The ILIAS element to crawl. (Required)
- `desktop`: Crawl your personal desktop / dashboard - `desktop`: Crawl your personal desktop / dashboard
- `<course id>`: Crawl the course with the given id - `<course id>`: Crawl the course with the given id
@ -191,6 +196,8 @@ If the values work, feel free to submit a PR and add them to the table above.
and duplication warnings if you are a member of an ILIAS group. The and duplication warnings if you are a member of an ILIAS group. The
`desktop` target is generally preferable. `desktop` target is generally preferable.
- `auth`: Name of auth section to use for login. (Required) - `auth`: Name of auth section to use for login. (Required)
- `tfa_auth`: Name of auth section to use for two-factor authentication. Only
uses the auth section's password. (Default: Anonymous `tfa` authenticator)
- `links`: How to represent external links. (Default: `fancy`) - `links`: How to represent external links. (Default: `fancy`)
- `ignore`: Don't download links. - `ignore`: Don't download links.
- `plaintext`: A text file containing only the URL. - `plaintext`: A text file containing only the URL.

View File

@ -1,6 +1,6 @@
Copyright 2019-2024 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, Copyright 2019-2024 Garmelon, I-Al-Istannen, danstooamerican, pavelzw,
TheChristophe, Scriptim, thelukasprobst, Toorero, TheChristophe, Scriptim, thelukasprobst, Toorero,
Mr-Pine, p-fruck Mr-Pine, p-fruck, PinieP
Permission is hereby granted, free of charge, to any person obtaining a copy of Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in this software and associated documentation files (the "Software"), to deal in

View File

@ -258,6 +258,10 @@ class Crawler(ABC):
def prev_report(self) -> Optional[Report]: def prev_report(self) -> Optional[Report]:
return self._output_dir.prev_report return self._output_dir.prev_report
@property
def output_dir(self) -> OutputDirectory:
return self._output_dir
@staticmethod @staticmethod
async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]: async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]:
""" """
@ -293,6 +297,8 @@ class Crawler(ABC):
async def download( async def download(
self, self,
path: PurePath, path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None, mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None, redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None, on_conflict: Optional[OnConflict] = None,
@ -307,7 +313,14 @@ class Crawler(ABC):
log.status("[bold bright_black]", "Ignored", fmt_path(path)) log.status("[bold bright_black]", "Ignored", fmt_path(path))
return None return None
fs_token = await self._output_dir.download(path, transformed_path, mtime, redownload, on_conflict) fs_token = await self._output_dir.download(
path,
transformed_path,
etag_differs=etag_differs,
mtime=mtime,
redownload=redownload,
on_conflict=on_conflict
)
if fs_token is None: if fs_token is None:
log.explain("Answer: No") log.explain("Answer: No")
return None return None

View File

@ -1,12 +1,14 @@
import asyncio import asyncio
import http.cookies import http.cookies
import ssl import ssl
from datetime import datetime
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional, Tuple
import aiohttp import aiohttp
import certifi import certifi
from aiohttp.client import ClientTimeout from aiohttp.client import ClientTimeout
from bs4 import Tag
from ..auth import Authenticator from ..auth import Authenticator
from ..config import Config from ..config import Config
@ -15,6 +17,8 @@ from ..utils import fmt_real_path
from ..version import NAME, VERSION from ..version import NAME, VERSION
from .crawler import Crawler, CrawlerSection from .crawler import Crawler, CrawlerSection
ETAGS_CUSTOM_REPORT_VALUE_KEY = "etags"
class HttpCrawlerSection(CrawlerSection): class HttpCrawlerSection(CrawlerSection):
def http_timeout(self) -> float: def http_timeout(self) -> float:
@ -169,6 +173,78 @@ class HttpCrawler(Crawler):
log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}") log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}")
log.warn(str(e)) log.warn(str(e))
@staticmethod
def get_folder_structure_from_heading_hierarchy(file_link: Tag, drop_h1: bool = False) -> PurePath:
"""
Retrieves the hierarchy of headings associated with the give file link and constructs a folder
structure from them.
<h1> level headings usually only appear once and serve as the page title, so they would introduce
redundant nesting. To avoid this, <h1> headings are ignored via the drop_h1 parameter.
"""
def find_associated_headings(tag: Tag, level: int) -> PurePath:
if level == 0 or (level == 1 and drop_h1):
return PurePath()
level_heading = tag.find_previous(name=f"h{level}")
if level_heading is None:
return find_associated_headings(tag, level - 1)
folder_name = level_heading.getText().strip()
return find_associated_headings(level_heading, level - 1) / folder_name
# start at level <h3> because paragraph-level headings are usually too granular for folder names
return find_associated_headings(file_link, 3)
def _get_previous_etag_from_report(self, path: PurePath) -> Optional[str]:
"""
If available, retrieves the entity tag for a given path which was stored in the previous report.
"""
if not self._output_dir.prev_report:
return None
etags = self._output_dir.prev_report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {}
return etags.get(str(path))
def _add_etag_to_report(self, path: PurePath, etag: Optional[str]) -> None:
"""
Adds an entity tag for a given path to the report's custom values.
"""
if not etag:
return
etags = self._output_dir.report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {}
etags[str(path)] = etag
self._output_dir.report.add_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY, etags)
async def _request_resource_version(self, resource_url: str) -> Tuple[Optional[str], Optional[datetime]]:
"""
Requests the ETag and Last-Modified headers of a resource via a HEAD request.
If no entity tag / modification date can be obtained, the according value will be None.
"""
try:
async with self.session.head(resource_url) as resp:
if resp.status != 200:
return None, None
etag_header = resp.headers.get("ETag")
last_modified_header = resp.headers.get("Last-Modified")
if last_modified_header:
try:
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified#directives
datetime_format = "%a, %d %b %Y %H:%M:%S GMT"
last_modified = datetime.strptime(last_modified_header, datetime_format)
except ValueError:
# last_modified remains None
pass
return etag_header, last_modified
except aiohttp.ClientError:
return None, None
async def run(self) -> None: async def run(self) -> None:
self._request_count = 0 self._request_count = 0
self._cookie_jar = aiohttp.CookieJar() self._cookie_jar = aiohttp.CookieJar()
@ -186,7 +262,12 @@ class HttpCrawler(Crawler):
connect=self._http_timeout, connect=self._http_timeout,
sock_connect=self._http_timeout, sock_connect=self._http_timeout,
sock_read=self._http_timeout, sock_read=self._http_timeout,
) ),
# See https://github.com/aio-libs/aiohttp/issues/6626
# Without this aiohttp will mangle the redirect header from Shibboleth, invalidating the
# passed signature. Shibboleth will not accept the broken signature and authentication will
# fail.
requote_redirect_url=False
) as session: ) as session:
self.session = session self.session = session
try: try:

View File

@ -25,9 +25,10 @@ def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Calla
except asyncio.exceptions.TimeoutError as e: # explicit http timeouts in HttpCrawler except asyncio.exceptions.TimeoutError as e: # explicit http timeouts in HttpCrawler
last_exception = e last_exception = e
log.explain_topic(f"Retrying operation {name}. Retries left: {attempts - 1 - round}") log.explain_topic(f"Retrying operation {name}. Retries left: {attempts - 1 - round}")
log.explain(f"Last exception: {last_exception!r}")
if last_exception: if last_exception:
message = f"Error in I/O Operation: {last_exception}" message = f"Error in I/O Operation: {last_exception!r}"
if failure_is_error: if failure_is_error:
raise CrawlError(message) from last_exception raise CrawlError(message) from last_exception
else: else:

View File

@ -12,6 +12,13 @@ _STYLE_TAG_CONTENT = """
font-weight: bold; font-weight: bold;
} }
.row-flex {
display: flex;
}
.row-flex-wrap {
flex-wrap: wrap;
}
.accordion-head { .accordion-head {
background-color: #f5f7fa; background-color: #f5f7fa;
padding: 0.5rem 0; padding: 0.5rem 0;
@ -85,6 +92,11 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup:
if isinstance(type(children[0]), Comment): if isinstance(type(children[0]), Comment):
dummy.decompose() dummy.decompose()
# Delete video figures, as they can not be internalized anyway
for video in soup.select(".ilc_media_cont_MediaContainerHighlighted .ilPageVideo"):
if figure := video.find_parent("figure"):
figure.decompose()
for hrule_imposter in soup.find_all(class_="ilc_section_Separator"): for hrule_imposter in soup.find_all(class_="ilc_section_Separator"):
hrule_imposter.insert(0, soup.new_tag("hr")) hrule_imposter.insert(0, soup.new_tag("hr"))

View File

@ -23,10 +23,16 @@ from .file_templates import Links, learning_module_template
from .ilias_html_cleaner import clean, insert_base_markup from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage, from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
IliasPageElement, _sanitize_path_name, parse_ilias_forum_export) IliasPageElement, _sanitize_path_name, parse_ilias_forum_export)
from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int] TargetType = Union[str, int]
class LoginTypeLocal:
def __init__(self, client_id: str):
self.client_id = client_id
class IliasWebCrawlerSection(HttpCrawlerSection): class IliasWebCrawlerSection(HttpCrawlerSection):
def base_url(self) -> str: def base_url(self) -> str:
base_url = self.s.get("base_url") base_url = self.s.get("base_url")
@ -35,12 +41,30 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
return base_url return base_url
def client_id(self) -> str: def login(self) -> Union[Literal["shibboleth"], LoginTypeLocal]:
client_id = self.s.get("client_id") login_type = self.s.get("login_type")
if not client_id: if not login_type:
self.missing_value("client_id") self.missing_value("login_type")
if login_type == "shibboleth":
return "shibboleth"
if login_type == "local":
client_id = self.s.get("client_id")
if not client_id:
self.missing_value("client_id")
return LoginTypeLocal(client_id)
return client_id self.invalid_value("login_type", login_type, "Should be <shibboleth | local>")
def tfa_auth(
self, authenticators: Dict[str, Authenticator]
) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
auth = authenticators.get(value)
if auth is None:
self.invalid_value("tfa_auth", value, "No such auth section exists")
return auth
def target(self) -> TargetType: def target(self) -> TargetType:
target = self.s.get("target") target = self.s.get("target")
@ -81,23 +105,24 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
_DIRECTORY_PAGES: Set[IliasElementType] = { _DIRECTORY_PAGES: Set[IliasElementType] = {
IliasElementType.COURSE,
IliasElementType.EXERCISE, IliasElementType.EXERCISE,
IliasElementType.EXERCISE_FILES, IliasElementType.EXERCISE_FILES,
IliasElementType.FOLDER, IliasElementType.FOLDER,
IliasElementType.INFO_TAB, IliasElementType.INFO_TAB,
IliasElementType.MEETING,
IliasElementType.MEDIACAST_VIDEO_FOLDER, IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.MEETING,
IliasElementType.OPENCAST_VIDEO_FOLDER, IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
} }
_VIDEO_ELEMENTS: Set[IliasElementType] = { _VIDEO_ELEMENTS: Set[IliasElementType] = {
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.MEDIACAST_VIDEO, IliasElementType.MEDIACAST_VIDEO,
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO, IliasElementType.OPENCAST_VIDEO,
IliasElementType.OPENCAST_VIDEO_PLAYER,
IliasElementType.OPENCAST_VIDEO_FOLDER, IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
IliasElementType.OPENCAST_VIDEO_PLAYER,
} }
@ -155,7 +180,13 @@ instance's greatest bottleneck.
self._auth = auth self._auth = auth
self._base_url = section.base_url() self._base_url = section.base_url()
self._client_id = section.client_id() self._tfa_auth = section.tfa_auth(authenticators)
self._login_type = section.login()
if isinstance(self._login_type, LoginTypeLocal):
self._client_id = self._login_type.client_id
else:
self._shibboleth_login = ShibbolethLogin(self._base_url, self._auth, self._tfa_auth)
self._target = section.target() self._target = section.target()
self._link_file_redirect_delay = section.link_redirect_delay() self._link_file_redirect_delay = section.link_redirect_delay()
@ -178,94 +209,43 @@ instance's greatest bottleneck.
async def _crawl_course(self, course_id: int) -> None: async def _crawl_course(self, course_id: int) -> None:
# Start crawling at the given course # Start crawling at the given course
root_url = url_set_query_param( root_url = url_set_query_param(
urljoin(self._base_url, "/goto.php"), urljoin(self._base_url + "/", "goto.php"),
"target", f"crs_{course_id}", "target", f"crs_{course_id}",
) )
await self._crawl_url(root_url, expected_id=course_id) await self._crawl_url(root_url, expected_id=course_id)
async def _crawl_desktop(self) -> None: async def _crawl_desktop(self) -> None:
appendix = r"ILIAS\Repository\Provider\RepositoryMainBarProvider|mm_pd_sel_items" await self._crawl_url(
appendix = appendix.encode("ASCII").hex() urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show")
await self._crawl_url(url_set_query_param( )
urljoin(self._base_url, "/gs_content.php"),
"item=", appendix,
))
async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None: async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None:
maybe_cl = await self.crawl(PurePath(".")) if awaitable := await self._handle_ilias_page(url, None, PurePath("."), expected_id):
if not maybe_cl: await awaitable
return
cl = maybe_cl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
description: List[BeautifulSoup] = []
@_iorepeat(3, "crawling url")
async def gather_elements() -> None:
elements.clear()
async with cl:
next_stage_url: Optional[str] = url
current_parent = None
# Duplicated code, but the root page is special - we want to avoid fetching it twice!
while next_stage_url:
soup = await self._get_page(next_stage_url, root_page_allowed=True)
if current_parent is None and expected_id is not None:
perma_link = IliasPage.get_soup_permalink(soup)
if not perma_link or "crs_" not in perma_link:
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
next_stage_url = next_element.url
else:
next_stage_url = None
elements.extend(page.get_child_elements())
if info_tab := page.get_info_tab():
elements.append(info_tab)
if description_string := page.get_description():
description.append(description_string)
# Fill up our task list with the found elements
await gather_elements()
if description:
await self._download_description(PurePath("."), description[0])
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(PurePath("."), element):
tasks.append(asyncio.create_task(handle))
# And execute them
await self.gather(tasks)
async def _handle_ilias_page( async def _handle_ilias_page(
self, self,
url: str, url: str,
parent: IliasPageElement, current_element: Optional[IliasPageElement],
path: PurePath, path: PurePath,
expected_course_id: Optional[int] = None,
) -> Optional[Coroutine[Any, Any, None]]: ) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(path) maybe_cl = await self.crawl(path)
if not maybe_cl: if not maybe_cl:
return None return None
return self._crawl_ilias_page(url, parent, maybe_cl) if current_element:
self._ensure_not_seen(current_element, path)
return self._crawl_ilias_page(url, current_element, maybe_cl, expected_course_id)
@anoncritical @anoncritical
async def _crawl_ilias_page( async def _crawl_ilias_page(
self, self,
url: str, url: str,
parent: IliasPageElement, current_element: Optional[IliasPageElement],
cl: CrawlToken, cl: CrawlToken,
expected_course_id: Optional[int] = None,
) -> None: ) -> None:
elements: List[IliasPageElement] = [] elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes # A list as variable redefinitions are not propagated to outer scopes
@ -276,12 +256,21 @@ instance's greatest bottleneck.
elements.clear() elements.clear()
async with cl: async with cl:
next_stage_url: Optional[str] = url next_stage_url: Optional[str] = url
current_parent = parent current_parent = current_element
while next_stage_url: while next_stage_url:
soup = await self._get_page(next_stage_url) soup = await self._get_page(next_stage_url)
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}") log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}") log.explain(f"URL: {next_stage_url}")
# If we expect to find a root course, enforce it
if current_parent is None and expected_course_id is not None:
perma_link = IliasPage.get_soup_permalink(soup)
if not perma_link or "crs_" not in perma_link:
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
if str(expected_course_id) not in perma_link:
raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}")
page = IliasPage(soup, next_stage_url, current_parent) page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element(): if next_element := page.get_next_stage_element():
current_parent = next_element current_parent = next_element
@ -320,14 +309,6 @@ instance's greatest bottleneck.
parent_path: PurePath, parent_path: PurePath,
element: IliasPageElement, element: IliasPageElement,
) -> Optional[Coroutine[Any, Any, None]]: ) -> Optional[Coroutine[Any, Any, None]]:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. "
+ f"First path: {fmt_path(self._visited_urls[element.url])}. "
+ f"Second path: {fmt_path(parent_path)}."
)
self._visited_urls[element.url] = parent_path
# element.name might contain `/` if the crawler created nested elements, # element.name might contain `/` if the crawler created nested elements,
# so we can not sanitize it here. We trust in the output dir to thwart worst-case # so we can not sanitize it here. We trust in the output dir to thwart worst-case
# directory escape attacks. # directory escape attacks.
@ -391,6 +372,8 @@ instance's greatest bottleneck.
return await self._handle_opencast_video(element, element_path) return await self._handle_opencast_video(element, element_path)
elif element.type == IliasElementType.MEDIACAST_VIDEO: elif element.type == IliasElementType.MEDIACAST_VIDEO:
return await self._handle_file(element, element_path) return await self._handle_file(element, element_path)
elif element.type == IliasElementType.MOB_VIDEO:
return await self._handle_file(element, element_path, is_video=True)
elif element.type in _DIRECTORY_PAGES: elif element.type in _DIRECTORY_PAGES:
return await self._handle_ilias_page(element.url, element, element_path) return await self._handle_ilias_page(element.url, element, element_path)
else: else:
@ -466,6 +449,8 @@ instance's greatest bottleneck.
if not maybe_dl: if not maybe_dl:
return None return None
self._ensure_not_seen(element, element_path)
return self._download_booking(element, link_template_maybe, maybe_dl) return self._download_booking(element, link_template_maybe, maybe_dl)
@anoncritical @anoncritical
@ -478,6 +463,7 @@ instance's greatest bottleneck.
async with dl as (bar, sink): async with dl as (bar, sink):
description = clean(insert_base_markup(description)) description = clean(insert_base_markup(description))
description = await self.internalize_images(description)
sink.file.write(description.prettify().encode("utf-8")) sink.file.write(description.prettify().encode("utf-8"))
sink.done() sink.done()
@ -493,17 +479,27 @@ instance's greatest bottleneck.
self._write_link_content(link_template, element.url, element.name, element.description, sink) self._write_link_content(link_template, element.url, element.name, element.description, sink)
async def _resolve_link_target(self, export_url: str) -> str: async def _resolve_link_target(self, export_url: str) -> str:
async with self.session.get(export_url, allow_redirects=False) as resp: async def impl() -> Optional[str]:
# No redirect means we were authenticated async with self.session.get(export_url, allow_redirects=False) as resp:
if hdrs.LOCATION not in resp.headers: # No redirect means we were authenticated
return soupify(await resp.read()).select_one("a").get("href").strip() if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()).select_one("a").get("href").strip()
# We are either unauthenticated or the link is not active
new_url = resp.headers[hdrs.LOCATION].lower()
if "baseclass=illinkresourcehandlergui" in new_url and "cmd=infoscreen" in new_url:
return ""
return None
await self._authenticate() auth_id = await self._current_auth_id()
target = await impl()
if target is not None:
return target
async with self.session.get(export_url, allow_redirects=False) as resp: await self.authenticate(auth_id)
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers: target = await impl()
return soupify(await resp.read()).select_one("a").get("href").strip() if target is not None:
return target
raise CrawlError("resolve_link_target failed even after authenticating") raise CrawlError("resolve_link_target failed even after authenticating")
@ -530,6 +526,8 @@ instance's greatest bottleneck.
if not maybe_dl: if not maybe_dl:
return None return None
self._ensure_not_seen(element, element_path)
# If we have every file from the cached mapping already, we can ignore this and bail # If we have every file from the cached mapping already, we can ignore this and bail
if self._all_opencast_videos_locally_present(element, maybe_dl.path): if self._all_opencast_videos_locally_present(element, maybe_dl.path):
# Mark all existing videos as known to ensure they do not get deleted during cleanup. # Mark all existing videos as known to ensure they do not get deleted during cleanup.
@ -623,18 +621,21 @@ instance's greatest bottleneck.
self, self,
element: IliasPageElement, element: IliasPageElement,
element_path: PurePath, element_path: PurePath,
is_video: bool = False,
) -> Optional[Coroutine[Any, Any, None]]: ) -> Optional[Coroutine[Any, Any, None]]:
maybe_dl = await self.download(element_path, mtime=element.mtime) maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl: if not maybe_dl:
return None return None
return self._download_file(element, maybe_dl) self._ensure_not_seen(element, element_path)
return self._download_file(element, maybe_dl, is_video)
@_iorepeat(3, "downloading file") @_iorepeat(3, "downloading file")
@anoncritical @anoncritical
async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None: async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None:
assert dl # The function is only reached when dl is not None assert dl # The function is only reached when dl is not None
async with dl as (bar, sink): async with dl as (bar, sink):
await self._stream_from_url(element.url, sink, bar, is_video=False) await self._stream_from_url(element.url, sink, bar, is_video)
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None: async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None:
async def try_stream() -> bool: async def try_stream() -> bool:
@ -663,6 +664,13 @@ instance's greatest bottleneck.
if is_video and "html" in resp.content_type: if is_video and "html" in resp.content_type:
return False return False
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range
if content_range := resp.headers.get(hdrs.CONTENT_RANGE, default=None):
parts = content_range.split("/")
if len(parts) == 2 and parts[1].isdigit():
bar.set_total(int(parts[1]))
# Prefer the content length header
if resp.content_length: if resp.content_length:
bar.set_total(resp.content_length) bar.set_total(resp.content_length)
@ -742,7 +750,8 @@ instance's greatest bottleneck.
return return
async with maybe_dl as (bar, sink): async with maybe_dl as (bar, sink):
content = element.title_tag.prettify() content = "<!DOCTYPE html>\n"
content += element.title_tag.prettify()
content += element.content_tag.prettify() content += element.content_tag.prettify()
sink.file.write(content.encode("utf-8")) sink.file.write(content.encode("utf-8"))
sink.done() sink.done()
@ -755,6 +764,8 @@ instance's greatest bottleneck.
maybe_cl = await self.crawl(element_path) maybe_cl = await self.crawl(element_path)
if not maybe_cl: if not maybe_cl:
return None return None
self._ensure_not_seen(element, element_path)
return self._crawl_learning_module(element, maybe_cl) return self._crawl_learning_module(element, maybe_cl)
@_iorepeat(3, "crawling learning module") @_iorepeat(3, "crawling learning module")
@ -877,6 +888,15 @@ instance's greatest bottleneck.
elem.attrs["src"] = "https:" + elem.attrs["src"] elem.attrs["src"] = "https:" + elem.attrs["src"]
return tag return tag
def _ensure_not_seen(self, element: IliasPageElement, parent_path: PurePath) -> None:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. "
+ f"First path: {fmt_path(self._visited_urls[element.url])}. "
+ f"Second path: {fmt_path(parent_path)}."
)
self._visited_urls[element.url] = parent_path
async def _get_page(self, url: str, root_page_allowed: bool = False) -> BeautifulSoup: async def _get_page(self, url: str, root_page_allowed: bool = False) -> BeautifulSoup:
auth_id = await self._current_auth_id() auth_id = await self._current_auth_id()
async with self.session.get(url) as request: async with self.session.get(url) as request:
@ -947,38 +967,39 @@ instance's greatest bottleneck.
return await request.read() return await request.read()
raise CrawlError("get_authenticated failed even after authenticating") raise CrawlError("get_authenticated failed even after authenticating")
# ToDo: Is iorepeat still required?
@_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None: async def _authenticate(self) -> None:
# fill the session with the correct cookies # fill the session with the correct cookies
params = { if self._login_type == "shibboleth":
"client_id": self._client_id, await self._shibboleth_login.login(self.session)
"cmd": "force_login", else:
} params = {
async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request: "client_id": self._client_id,
login_page = soupify(await request.read()) "cmd": "force_login",
}
async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request:
login_page = soupify(await request.read())
login_form = login_page.find("form", attrs={"name": "formlogin"}) login_form = login_page.find("form", attrs={"name": "formlogin"})
if login_form is None: if login_form is None:
raise CrawlError("Could not find the login form! Specified client id might be invalid.") raise CrawlError("Could not find the login form! Specified client id might be invalid.")
login_url = login_form.attrs.get("action") login_url = login_form.attrs.get("action")
if login_url is None: if login_url is None:
raise CrawlError("Could not find the action URL in the login form!") raise CrawlError("Could not find the action URL in the login form!")
username, password = await self._auth.credentials() username, password = await self._auth.credentials()
login_data = { login_data = {
"username": username, "username": username,
"password": password, "password": password,
"cmd[doStandardAuthentication]": "Login", "cmd[doStandardAuthentication]": "Login",
} }
# do the actual login # do the actual login
async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request: async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request:
soup = soupify(await request.read()) soup = soupify(await request.read())
if not self._is_logged_in(soup): if not self._is_logged_in(soup):
self._auth.invalidate_credentials() self._auth.invalidate_credentials()
@staticmethod @staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool: def _is_logged_in(soup: BeautifulSoup) -> bool:

View File

@ -15,25 +15,27 @@ TargetType = Union[str, int]
class IliasElementType(Enum): class IliasElementType(Enum):
BOOKING = "booking"
COURSE = "course"
EXERCISE = "exercise" EXERCISE = "exercise"
EXERCISE_FILES = "exercise_files" # own submitted files EXERCISE_FILES = "exercise_files" # own submitted files
TEST = "test" # an online test. Will be ignored currently.
FILE = "file" FILE = "file"
FOLDER = "folder" FOLDER = "folder"
FORUM = "forum" FORUM = "forum"
LINK = "link"
INFO_TAB = "info_tab" INFO_TAB = "info_tab"
LEARNING_MODULE = "learning_module" LEARNING_MODULE = "learning_module"
BOOKING = "booking" LINK = "link"
MEETING = "meeting"
SURVEY = "survey"
SCORM_LEARNING_MODULE = "scorm_learning_module"
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder"
MEDIACAST_VIDEO = "mediacast_video" MEDIACAST_VIDEO = "mediacast_video"
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder"
MEETING = "meeting"
MOB_VIDEO = "mob_video"
OPENCAST_VIDEO = "opencast_video" OPENCAST_VIDEO = "opencast_video"
OPENCAST_VIDEO_PLAYER = "opencast_video_player"
OPENCAST_VIDEO_FOLDER = "opencast_video_folder" OPENCAST_VIDEO_FOLDER = "opencast_video_folder"
OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated" OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated"
OPENCAST_VIDEO_PLAYER = "opencast_video_player"
SCORM_LEARNING_MODULE = "scorm_learning_module"
SURVEY = "survey"
TEST = "test" # an online test. Will be ignored currently.
@dataclass @dataclass
@ -322,7 +324,7 @@ class IliasPage:
return False return False
def _is_personal_desktop(self) -> bool: def _is_personal_desktop(self) -> bool:
return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}) return "baseclass=ildashboardgui" in self._page_url.lower() and "&cmd=show" in self._page_url.lower()
def _is_content_page(self) -> bool: def _is_content_page(self) -> bool:
if link := self.get_permalink(): if link := self.get_permalink():
@ -427,9 +429,14 @@ class IliasPage:
def _find_personal_desktop_entries(self) -> List[IliasPageElement]: def _find_personal_desktop_entries(self) -> List[IliasPageElement]:
items: List[IliasPageElement] = [] items: List[IliasPageElement] = []
titles: List[Tag] = self._soup.select(".il-item-title") titles: List[Tag] = self._soup.select("#block_pditems_0 .il-item-title")
for title in titles: for title in titles:
link = title.find("a") link = title.find("a")
if not link:
log.explain(f"Skipping offline item: {title.getText().strip()!r}")
continue
name = _sanitize_path_name(link.text.strip()) name = _sanitize_path_name(link.text.strip())
url = self._abs_url_from_link(link) url = self._abs_url_from_link(link)
@ -739,6 +746,7 @@ class IliasPage:
result += self._find_cards() result += self._find_cards()
result += self._find_mediacast_videos() result += self._find_mediacast_videos()
result += self._find_mob_videos()
return result return result
@ -767,6 +775,37 @@ class IliasPage:
return videos return videos
def _find_mob_videos(self) -> List[IliasPageElement]:
videos: List[IliasPageElement] = []
for figure in self._soup.select("figure.ilc_media_cont_MediaContainerHighlighted"):
title = figure.select_one("figcaption").getText().strip() + ".mp4"
video_element = figure.select_one("video")
if not video_element:
_unexpected_html_warning()
log.warn_contd(f"No <video> element found for mob video '{title}'")
continue
url = None
for source in video_element.select("source"):
if source.get("type", "") == "video/mp4":
url = source.get("src")
break
if url is None:
_unexpected_html_warning()
log.warn_contd(f"No <source> element found for mob video '{title}'")
continue
videos.append(IliasPageElement.create_new(
typ=IliasElementType.MOB_VIDEO,
url=self._abs_url_from_relative(url),
name=_sanitize_path_name(title),
mtime=None
))
return videos
def _find_mediacast_video_mtime(self, enclosing_td: Tag) -> Optional[datetime]: def _find_mediacast_video_mtime(self, enclosing_td: Tag) -> Optional[datetime]:
description_td: Tag = enclosing_td.findPreviousSibling("td") description_td: Tag = enclosing_td.findPreviousSibling("td")
if not description_td: if not description_td:
@ -959,10 +998,14 @@ class IliasPage:
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
if "exc" in icon["class"]: if "exc" in icon["class"]:
return IliasElementType.EXERCISE return IliasElementType.EXERCISE
if "grp" in icon["class"]:
return IliasElementType.FOLDER
if "webr" in icon["class"]: if "webr" in icon["class"]:
return IliasElementType.LINK return IliasElementType.LINK
if "book" in icon["class"]: if "book" in icon["class"]:
return IliasElementType.BOOKING return IliasElementType.BOOKING
if "crsr" in icon["class"]:
return IliasElementType.COURSE
if "frm" in icon["class"]: if "frm" in icon["class"]:
return IliasElementType.FORUM return IliasElementType.FORUM
if "sess" in icon["class"]: if "sess" in icon["class"]:

View File

@ -1,23 +1,14 @@
from typing import Any, Dict, Optional, Union from typing import Dict, Literal
import aiohttp from ...auth import Authenticator
import yarl
from bs4 import BeautifulSoup
from ...auth import Authenticator, TfaAuthenticator
from ...config import Config from ...config import Config
from ...logging import log
from ...utils import soupify
from ..crawler import CrawlError, CrawlWarning
from .async_helper import _iorepeat
from .ilias_web_crawler import IliasWebCrawler, IliasWebCrawlerSection from .ilias_web_crawler import IliasWebCrawler, IliasWebCrawlerSection
from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int]
_ILIAS_URL = "https://ilias.studium.kit.edu" _ILIAS_URL = "https://ilias.studium.kit.edu"
class KitShibbolethBackgroundLoginSuccessful(): class KitShibbolethBackgroundLoginSuccessful:
pass pass
@ -25,19 +16,8 @@ class KitIliasWebCrawlerSection(IliasWebCrawlerSection):
def base_url(self) -> str: def base_url(self) -> str:
return _ILIAS_URL return _ILIAS_URL
def client_id(self) -> str: def login(self) -> Literal["shibboleth"]:
# KIT ILIAS uses the Shibboleth service for authentication. There's no return "shibboleth"
# use for a client id.
return "unused"
def tfa_auth(self, authenticators: Dict[str, Authenticator]) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
auth = authenticators.get(value)
if auth is None:
self.invalid_value("tfa_auth", value, "No such auth section exists")
return auth
class KitIliasWebCrawler(IliasWebCrawler): class KitIliasWebCrawler(IliasWebCrawler):
@ -46,184 +26,12 @@ class KitIliasWebCrawler(IliasWebCrawler):
name: str, name: str,
section: KitIliasWebCrawlerSection, section: KitIliasWebCrawlerSection,
config: Config, config: Config,
authenticators: Dict[str, Authenticator] authenticators: Dict[str, Authenticator],
): ):
super().__init__(name, section, config, authenticators) super().__init__(name, section, config, authenticators)
self._shibboleth_login = KitShibbolethLogin( self._shibboleth_login = ShibbolethLogin(
_ILIAS_URL,
self._auth, self._auth,
section.tfa_auth(authenticators), section.tfa_auth(authenticators),
) )
# We repeat this as the login method in shibboleth doesn't handle I/O errors.
# Shibboleth is quite reliable as well, the repeat is likely not critical here.
@_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session)
class KitShibbolethLogin:
"""
Login via KIT's shibboleth system.
"""
def __init__(self, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]) -> None:
self._auth = authenticator
self._tfa_auth = tfa_authenticator
async def login(self, sess: aiohttp.ClientSession) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
url = f"{_ILIAS_URL}/shib_login.php"
data = {
"sendLogin": "1",
"idp_selection": "https://idp.scc.kit.edu/idp/shibboleth",
"il_target": "",
"home_organization_selection": "Weiter",
}
soup: Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful] = await _shib_post(sess, url, data)
if isinstance(soup, KitShibbolethBackgroundLoginSuccessful):
return
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"class": "full content", "method": "post"})
action = form["action"]
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = "https://idp.scc.kit.edu" + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
"csrf_token": csrf_token
}
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup)
if not self._login_successful(soup):
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = f"{_ILIAS_URL}/Shibboleth.sso/SAML2/POST"
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
await sess.post(url, data=data)
async def _authenticate_tfa(
self,
session: aiohttp.ClientSession,
soup: BeautifulSoup
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
tfa_token = await self._tfa_auth.password()
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = "https://idp.scc.kit.edu" + action
data = {
"_eventId_proceed": "",
"j_tokenNumber": tfa_token,
"csrf_token": csrf_token
}
return await _post(session, url, data)
@staticmethod
def _login_successful(soup: BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="j_tokenNumber") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())
async def _shib_post(
session: aiohttp.ClientSession,
url: str,
data: Any
) -> Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful]:
"""
aiohttp unescapes '/' and ':' in URL query parameters which is not RFC compliant and rejected
by Shibboleth. Thanks a lot. So now we unroll the requests manually, parse location headers and
build encoded URL objects ourselves... Who thought mangling location header was a good idea??
"""
log.explain_topic("Shib login POST")
async with session.post(url, data=data, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Got location {location!r}")
if not location:
raise CrawlWarning(f"Login failed (1), no location header present at {url}")
correct_url = yarl.URL(location, encoded=True)
log.explain(f"Corrected location to {correct_url!r}")
if str(correct_url).startswith(_ILIAS_URL):
log.explain("ILIAS recognized our shib token and logged us in in the background, returning")
return KitShibbolethBackgroundLoginSuccessful()
async with session.get(correct_url, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Redirected to {location!r} with status {response.status}")
# If shib still has a valid session, it will directly respond to the request
if location is None:
log.explain("Shib recognized us, returning its response directly")
return soupify(await response.read())
as_yarl = yarl.URL(response.url)
# Probably not needed anymore, but might catch a few weird situations with a nicer message
if not location or not as_yarl.host:
raise CrawlWarning(f"Login failed (2), no location header present at {correct_url}")
correct_url = yarl.URL.build(
scheme=as_yarl.scheme,
host=as_yarl.host,
path=location,
encoded=True
)
log.explain(f"Corrected location to {correct_url!r}")
async with session.get(correct_url, allow_redirects=False) as response:
return soupify(await response.read())

View File

@ -0,0 +1,128 @@
from typing import Any, Optional
import aiohttp
import yarl
from bs4 import BeautifulSoup
from ...auth import Authenticator, TfaAuthenticator
from ...logging import log
from ...utils import soupify
from ..crawler import CrawlError
class ShibbolethLogin:
"""
Login via shibboleth system.
"""
def __init__(
self, ilias_url: str, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]
) -> None:
self._ilias_url = ilias_url
self._auth = authenticator
self._tfa_auth = tfa_authenticator
async def login(self, sess: aiohttp.ClientSession) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
url = f"{self._ilias_url}/shib_login.php"
async with sess.get(url) as response:
shib_url = response.url
if str(shib_url).startswith(self._ilias_url):
log.explain(
"ILIAS recognized our shib token and logged us in in the background, returning"
)
return
soup: BeautifulSoup = soupify(await response.read())
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"]
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup, shib_url)
if not self._login_successful(soup):
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = form = soup.find("form", {"method": "post"})["action"]
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
await sess.post(url, data=data)
async def _authenticate_tfa(
self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
tfa_token = await self._tfa_auth.password()
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_tokenNumber": tfa_token,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"]
return await _post(session, url, data)
@staticmethod
def _login_successful(soup: BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="j_tokenNumber") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())

View File

@ -1,8 +1,9 @@
import os import os
import re import re
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime
from pathlib import PurePath from pathlib import PurePath
from typing import Awaitable, List, Optional, Pattern, Set, Tuple, Union from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union
from urllib.parse import urljoin from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
@ -31,24 +32,24 @@ class KitIpdCrawlerSection(HttpCrawlerSection):
return re.compile(regex) return re.compile(regex)
@dataclass(unsafe_hash=True) @dataclass
class KitIpdFile: class KitIpdFile:
name: str name: str
url: str url: str
def explain(self) -> None:
log.explain(f"File {self.name!r} (href={self.url!r})")
@dataclass @dataclass
class KitIpdFolder: class KitIpdFolder:
name: str name: str
files: List[KitIpdFile] entries: List[Union[KitIpdFile, "KitIpdFolder"]]
def explain(self) -> None: def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}") log.explain_topic(f"Folder {self.name!r}")
for file in self.files: for entry in self.entries:
log.explain(f"File {file.name!r} (href={file.url!r})") entry.explain()
def __hash__(self) -> int:
return self.name.__hash__()
class KitIpdCrawler(HttpCrawler): class KitIpdCrawler(HttpCrawler):
@ -72,68 +73,83 @@ class KitIpdCrawler(HttpCrawler):
async with maybe_cl: async with maybe_cl:
for item in await self._fetch_items(): for item in await self._fetch_items():
item.explain()
if isinstance(item, KitIpdFolder): if isinstance(item, KitIpdFolder):
tasks.append(self._crawl_folder(item)) tasks.append(self._crawl_folder(PurePath("."), item))
else: else:
# Orphan files are placed in the root folder log.explain_topic(f"Orphan file {item.name!r} (href={item.url!r})")
tasks.append(self._download_file(PurePath("."), item)) log.explain("Attributing it to root folder")
# do this here to at least be sequential and not parallel (rate limiting is hard, as the
# crawl abstraction does not hold for these requests)
etag, mtime = await self._request_resource_version(item.url)
tasks.append(self._download_file(PurePath("."), item, etag, mtime))
await self.gather(tasks) await self.gather(tasks)
async def _crawl_folder(self, folder: KitIpdFolder) -> None: async def _crawl_folder(self, parent: PurePath, folder: KitIpdFolder) -> None:
path = PurePath(folder.name) path = parent / folder.name
if not await self.crawl(path): if not await self.crawl(path):
return return
tasks = [self._download_file(path, file) for file in folder.files] tasks = []
for entry in folder.entries:
if isinstance(entry, KitIpdFolder):
tasks.append(self._crawl_folder(path, entry))
else:
# do this here to at least be sequential and not parallel (rate limiting is hard, as the crawl
# abstraction does not hold for these requests)
etag, mtime = await self._request_resource_version(entry.url)
tasks.append(self._download_file(path, entry, etag, mtime))
await self.gather(tasks) await self.gather(tasks)
async def _download_file(self, parent: PurePath, file: KitIpdFile) -> None: async def _download_file(
self,
parent: PurePath,
file: KitIpdFile,
etag: Optional[str],
mtime: Optional[datetime]
) -> None:
element_path = parent / file.name element_path = parent / file.name
maybe_dl = await self.download(element_path)
prev_etag = self._get_previous_etag_from_report(element_path)
etag_differs = None if prev_etag is None else prev_etag != etag
maybe_dl = await self.download(element_path, etag_differs=etag_differs, mtime=mtime)
if not maybe_dl: if not maybe_dl:
# keep storing the known file's etag
if prev_etag:
self._add_etag_to_report(element_path, prev_etag)
return return
async with maybe_dl as (bar, sink): async with maybe_dl as (bar, sink):
await self._stream_from_url(file.url, sink, bar) await self._stream_from_url(file.url, element_path, sink, bar)
async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]: async def _fetch_items(self) -> Iterable[Union[KitIpdFile, KitIpdFolder]]:
page, url = await self.get_page() page, url = await self.get_page()
elements: List[Tag] = self._find_file_links(page) elements: List[Tag] = self._find_file_links(page)
items: Set[Union[KitIpdFile, KitIpdFolder]] = set()
# do not add unnecessary nesting for a single <h1> heading
drop_h1: bool = len(page.find_all(name="h1")) <= 1
folder_tree: KitIpdFolder = KitIpdFolder(".", [])
for element in elements: for element in elements:
folder_label = self._find_folder_label(element) parent = HttpCrawler.get_folder_structure_from_heading_hierarchy(element, drop_h1)
if folder_label: file = self._extract_file(element, url)
folder = self._extract_folder(folder_label, url)
if folder not in items:
items.add(folder)
folder.explain()
else:
file = self._extract_file(element, url)
items.add(file)
log.explain_topic(f"Orphan file {file.name!r} (href={file.url!r})")
log.explain("Attributing it to root folder")
return items current_folder: KitIpdFolder = folder_tree
for folder_name in parent.parts:
# helps the type checker to verify that current_folder is indeed a folder
def subfolders() -> Generator[KitIpdFolder, Any, None]:
return (entry for entry in current_folder.entries if isinstance(entry, KitIpdFolder))
def _extract_folder(self, folder_tag: Tag, url: str) -> KitIpdFolder: if not any(entry.name == folder_name for entry in subfolders()):
files: List[KitIpdFile] = [] current_folder.entries.append(KitIpdFolder(folder_name, []))
name = folder_tag.getText().strip() current_folder = next(entry for entry in subfolders() if entry.name == folder_name)
container: Tag = folder_tag.findNextSibling(name="table") current_folder.entries.append(file)
for link in self._find_file_links(container):
files.append(self._extract_file(link, url))
return KitIpdFolder(name, files) return folder_tree.entries
@staticmethod
def _find_folder_label(file_link: Tag) -> Optional[Tag]:
enclosing_table: Tag = file_link.findParent(name="table")
if enclosing_table is None:
return None
return enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$"))
def _extract_file(self, link: Tag, url: str) -> KitIpdFile: def _extract_file(self, link: Tag, url: str) -> KitIpdFile:
url = self._abs_url_from_link(url, link) url = self._abs_url_from_link(url, link)
@ -146,7 +162,7 @@ class KitIpdCrawler(HttpCrawler):
def _abs_url_from_link(self, url: str, link_tag: Tag) -> str: def _abs_url_from_link(self, url: str, link_tag: Tag) -> str:
return urljoin(url, link_tag.get("href")) return urljoin(url, link_tag.get("href"))
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar) -> None: async def _stream_from_url(self, url: str, path: PurePath, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp: async with self.session.get(url, allow_redirects=False) as resp:
if resp.status == 403: if resp.status == 403:
raise CrawlError("Received a 403. Are you within the KIT network/VPN?") raise CrawlError("Received a 403. Are you within the KIT network/VPN?")
@ -159,6 +175,8 @@ class KitIpdCrawler(HttpCrawler):
sink.done() sink.done()
self._add_etag_to_report(path, resp.headers.get("ETag"))
async def get_page(self) -> Tuple[BeautifulSoup, str]: async def get_page(self) -> Tuple[BeautifulSoup, str]:
async with self.session.get(self._url) as request: async with self.session.get(self._url) as request:
# The web page for Algorithmen für Routenplanung contains some # The web page for Algorithmen für Routenplanung contains some

View File

@ -57,6 +57,7 @@ class OnConflict(Enum):
@dataclass @dataclass
class Heuristics: class Heuristics:
etag_differs: Optional[bool]
mtime: Optional[datetime] mtime: Optional[datetime]
@ -233,8 +234,16 @@ class OutputDirectory:
remote_newer = None remote_newer = None
# ETag should be a more reliable indicator than mtime, so we check it first
if heuristics.etag_differs is not None:
remote_newer = heuristics.etag_differs
if remote_newer:
log.explain("Remote file's entity tag differs")
else:
log.explain("Remote file's entity tag is the same")
# Python on Windows crashes when faced with timestamps around the unix epoch # Python on Windows crashes when faced with timestamps around the unix epoch
if heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970): if remote_newer is None and heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970):
mtime = heuristics.mtime mtime = heuristics.mtime
remote_newer = mtime.timestamp() > stat.st_mtime remote_newer = mtime.timestamp() > stat.st_mtime
if remote_newer: if remote_newer:
@ -366,6 +375,8 @@ class OutputDirectory:
self, self,
remote_path: PurePath, remote_path: PurePath,
path: PurePath, path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None, mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None, redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None, on_conflict: Optional[OnConflict] = None,
@ -375,7 +386,7 @@ class OutputDirectory:
MarkConflictError. MarkConflictError.
""" """
heuristics = Heuristics(mtime) heuristics = Heuristics(etag_differs, mtime)
redownload = self._redownload if redownload is None else redownload redownload = self._redownload if redownload is None else redownload
on_conflict = self._on_conflict if on_conflict is None else on_conflict on_conflict = self._on_conflict if on_conflict is None else on_conflict
local_path = self.resolve(path) local_path = self.resolve(path)

View File

@ -1,5 +1,6 @@
from pathlib import Path from pathlib import Path, PurePath
from typing import Dict, List, Optional from typing import Dict, List, Optional
from urllib.parse import quote
from rich.markup import escape from rich.markup import escape
@ -168,19 +169,26 @@ class Pferd:
log.report("") log.report("")
log.report(f"[bold bright_cyan]Report[/] for {escape(name)}") log.report(f"[bold bright_cyan]Report[/] for {escape(name)}")
def fmt_path_link(relative_path: PurePath) -> str:
# We need to URL-encode the path because it might contain spaces or special characters
absolute_path = str(crawler.output_dir.resolve(relative_path).absolute())
absolute_path = absolute_path.replace("\\\\?\\", "")
link = f"file://{quote(absolute_path)}"
return f"[link={link}]{fmt_path(relative_path)}[/link]"
something_changed = False something_changed = False
for path in sorted(crawler.report.added_files): for path in sorted(crawler.report.added_files):
something_changed = True something_changed = True
log.report(f" [bold bright_green]Added[/] {fmt_path(path)}") log.report(f" [bold bright_green]Added[/] {fmt_path_link(path)}")
for path in sorted(crawler.report.changed_files): for path in sorted(crawler.report.changed_files):
something_changed = True something_changed = True
log.report(f" [bold bright_yellow]Changed[/] {fmt_path(path)}") log.report(f" [bold bright_yellow]Changed[/] {fmt_path_link(path)}")
for path in sorted(crawler.report.deleted_files): for path in sorted(crawler.report.deleted_files):
something_changed = True something_changed = True
log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}") log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}")
for path in sorted(crawler.report.not_deleted_files): for path in sorted(crawler.report.not_deleted_files):
something_changed = True something_changed = True
log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}") log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path_link(path)}")
for warning in crawler.report.encountered_warnings: for warning in crawler.report.encountered_warnings:
something_changed = True something_changed = True

View File

@ -1,2 +1,2 @@
NAME = "PFERD" NAME = "PFERD"
VERSION = "3.6.0" VERSION = "3.7.0"