Compare commits

..

17 Commits

Author SHA1 Message Date
4af02012bc Strip long path prefix from file links in report 2024-11-14 20:06:13 +01:00
287173b0b1 Bump version to 3.7.0 2024-11-13 20:38:27 +01:00
712217e959 Handle groups in cards 2024-11-11 12:53:08 +01:00
6dda4c55a8 Add doctype header to forum threads
This should fix mimetype detection on most systems and is more relevant
now that the report is clickable
2024-11-05 18:36:21 +01:00
596b6a7688 Add support for non-KIT shibboleth login (#98)
Co-authored-by: Mr-Pine <git@mr-pine.de>
Co-authored-by: I-Al-Istannen <I-Al-Istannen@users.noreply.github.com>
2024-11-05 18:30:34 +01:00
Tim
5983200247 Treat headings as folders in kit-ipd crawler (#99) 2024-11-04 23:53:48 +01:00
Tim
26e802d88b Add clickable links to file names in the printed report (#100)
Co-authored-by: I-Al-Istannen <i-al-istannen@users.noreply.github.com>
2024-11-04 00:32:32 +01:00
f5c4e82816 Delay ilias loop detection after transform
This allows users to filter out duplicated elements and suppress the
warning.
2024-11-02 22:46:51 +01:00
f5273f7ca0 Collapse ilias url crawling into normal page crawling 2024-11-02 22:46:51 +01:00
fa71a9f44f Add support for mob videos in page descriptions 2024-10-28 20:35:30 +01:00
81d6ff53c4 Respect row flex in descriptions 2024-10-28 19:41:03 +01:00
d7a2b6e019 Delete videos from course descriptions 2024-10-28 19:41:03 +01:00
71c65e89d1 Internalize images in course descriptions 2024-10-28 19:41:03 +01:00
c1046498e7 Fix download of links without a target URL
They are now downloaded as links to the empty url.
2024-10-28 19:41:03 +01:00
8fbd1978af Fix crawling of nested courses 2024-10-28 18:52:27 +01:00
Tim
739dd95850 Use Last-Modified and ETag headers to determine KIT-IPD file versions (#95)
Co-authored-by: I-Al-Istannen <i-al-istannen@users.noreply.github.com>
2024-10-27 19:03:47 +01:00
c54c3bcfa1 Fix crawling of favorites 2024-10-27 10:50:59 +01:00
15 changed files with 569 additions and 397 deletions

View File

@ -22,6 +22,27 @@ ambiguous situations.
## Unreleased
## 3.7.0 - 2024-11-13
### Added
- Support for MOB videos in page descriptions
- Clickable links in the report to directly open new/modified/not-deleted files
- Support for non KIT shibboleth login
### Changed
- Remove videos from description pages
- Perform ILIAS cycle detection after processing the transform to allow
ignoring duplicated elements
- Parse headings (h1-h3) as folders in kit-ipd crawler
### Fixed
- Personal desktop/dashboard/favorites crawling
- Crawling of nested courses
- Downloading of links with no target URL
- Handle row flex on description pages
- Add `<!DOCTYPE html>` heading to forum threads to fix mime type detection
- Handle groups in cards
## 3.6.0 - 2024-10-23
### Added

View File

@ -163,12 +163,13 @@ out of the box for the corresponding universities:
[ilias-dl]: https://github.com/V3lop5/ilias-downloader/blob/main/configs "ilias-downloader configs"
| University | `base_url` | `client_id` |
|---------------|--------------------------------------|---------------|
| FH Aachen | https://www.ili.fh-aachen.de | elearning |
| Uni Köln | https://www.ilias.uni-koeln.de/ilias | uk |
| Uni Konstanz | https://ilias.uni-konstanz.de | ILIASKONSTANZ |
| Uni Stuttgart | https://ilias3.uni-stuttgart.de | Uni_Stuttgart |
| University | `base_url` | `login_type` | `client_id` |
|---------------|-----------------------------------------|--------------|---------------|
| FH Aachen | https://www.ili.fh-aachen.de | local | elearning |
| Uni Köln | https://www.ilias.uni-koeln.de/ilias | local | uk |
| Uni Konstanz | https://ilias.uni-konstanz.de | local | ILIASKONSTANZ |
| Uni Stuttgart | https://ilias3.uni-stuttgart.de | local | Uni_Stuttgart |
| Uni Tübingen | https://ovidius.uni-tuebingen.de/ilias3 | shibboleth | |
If your university isn't listed, try navigating to your instance's login page.
Assuming no custom login service is used, the URL will look something like this:
@ -180,7 +181,11 @@ Assuming no custom login service is used, the URL will look something like this:
If the values work, feel free to submit a PR and add them to the table above.
- `base_url`: The URL where the ILIAS instance is located. (Required)
- `client_id`: An ID used for authentication. (Required)
- `login_type`: How you authenticate. (Required)
- `local`: Use `client_id` for authentication.
- `shibboleth`: Use shibboleth for authentication.
- `client_id`: An ID used for authentication if `login_type` is `local`. Is
ignored if `login_type` is `shibboleth`.
- `target`: The ILIAS element to crawl. (Required)
- `desktop`: Crawl your personal desktop / dashboard
- `<course id>`: Crawl the course with the given id
@ -191,6 +196,8 @@ If the values work, feel free to submit a PR and add them to the table above.
and duplication warnings if you are a member of an ILIAS group. The
`desktop` target is generally preferable.
- `auth`: Name of auth section to use for login. (Required)
- `tfa_auth`: Name of auth section to use for two-factor authentication. Only
uses the auth section's password. (Default: Anonymous `tfa` authenticator)
- `links`: How to represent external links. (Default: `fancy`)
- `ignore`: Don't download links.
- `plaintext`: A text file containing only the URL.

View File

@ -1,6 +1,6 @@
Copyright 2019-2024 Garmelon, I-Al-Istannen, danstooamerican, pavelzw,
TheChristophe, Scriptim, thelukasprobst, Toorero,
Mr-Pine, p-fruck
Mr-Pine, p-fruck, PinieP
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in

View File

@ -258,6 +258,10 @@ class Crawler(ABC):
def prev_report(self) -> Optional[Report]:
return self._output_dir.prev_report
@property
def output_dir(self) -> OutputDirectory:
return self._output_dir
@staticmethod
async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]:
"""
@ -293,6 +297,8 @@ class Crawler(ABC):
async def download(
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
@ -307,7 +313,14 @@ class Crawler(ABC):
log.status("[bold bright_black]", "Ignored", fmt_path(path))
return None
fs_token = await self._output_dir.download(path, transformed_path, mtime, redownload, on_conflict)
fs_token = await self._output_dir.download(
path,
transformed_path,
etag_differs=etag_differs,
mtime=mtime,
redownload=redownload,
on_conflict=on_conflict
)
if fs_token is None:
log.explain("Answer: No")
return None

View File

@ -1,12 +1,14 @@
import asyncio
import http.cookies
import ssl
from datetime import datetime
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
import aiohttp
import certifi
from aiohttp.client import ClientTimeout
from bs4 import Tag
from ..auth import Authenticator
from ..config import Config
@ -15,6 +17,8 @@ from ..utils import fmt_real_path
from ..version import NAME, VERSION
from .crawler import Crawler, CrawlerSection
ETAGS_CUSTOM_REPORT_VALUE_KEY = "etags"
class HttpCrawlerSection(CrawlerSection):
def http_timeout(self) -> float:
@ -169,6 +173,78 @@ class HttpCrawler(Crawler):
log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}")
log.warn(str(e))
@staticmethod
def get_folder_structure_from_heading_hierarchy(file_link: Tag, drop_h1: bool = False) -> PurePath:
"""
Retrieves the hierarchy of headings associated with the give file link and constructs a folder
structure from them.
<h1> level headings usually only appear once and serve as the page title, so they would introduce
redundant nesting. To avoid this, <h1> headings are ignored via the drop_h1 parameter.
"""
def find_associated_headings(tag: Tag, level: int) -> PurePath:
if level == 0 or (level == 1 and drop_h1):
return PurePath()
level_heading = tag.find_previous(name=f"h{level}")
if level_heading is None:
return find_associated_headings(tag, level - 1)
folder_name = level_heading.getText().strip()
return find_associated_headings(level_heading, level - 1) / folder_name
# start at level <h3> because paragraph-level headings are usually too granular for folder names
return find_associated_headings(file_link, 3)
def _get_previous_etag_from_report(self, path: PurePath) -> Optional[str]:
"""
If available, retrieves the entity tag for a given path which was stored in the previous report.
"""
if not self._output_dir.prev_report:
return None
etags = self._output_dir.prev_report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {}
return etags.get(str(path))
def _add_etag_to_report(self, path: PurePath, etag: Optional[str]) -> None:
"""
Adds an entity tag for a given path to the report's custom values.
"""
if not etag:
return
etags = self._output_dir.report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {}
etags[str(path)] = etag
self._output_dir.report.add_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY, etags)
async def _request_resource_version(self, resource_url: str) -> Tuple[Optional[str], Optional[datetime]]:
"""
Requests the ETag and Last-Modified headers of a resource via a HEAD request.
If no entity tag / modification date can be obtained, the according value will be None.
"""
try:
async with self.session.head(resource_url) as resp:
if resp.status != 200:
return None, None
etag_header = resp.headers.get("ETag")
last_modified_header = resp.headers.get("Last-Modified")
if last_modified_header:
try:
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified#directives
datetime_format = "%a, %d %b %Y %H:%M:%S GMT"
last_modified = datetime.strptime(last_modified_header, datetime_format)
except ValueError:
# last_modified remains None
pass
return etag_header, last_modified
except aiohttp.ClientError:
return None, None
async def run(self) -> None:
self._request_count = 0
self._cookie_jar = aiohttp.CookieJar()
@ -186,7 +262,12 @@ class HttpCrawler(Crawler):
connect=self._http_timeout,
sock_connect=self._http_timeout,
sock_read=self._http_timeout,
)
),
# See https://github.com/aio-libs/aiohttp/issues/6626
# Without this aiohttp will mangle the redirect header from Shibboleth, invalidating the
# passed signature. Shibboleth will not accept the broken signature and authentication will
# fail.
requote_redirect_url=False
) as session:
self.session = session
try:

View File

@ -25,9 +25,10 @@ def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Calla
except asyncio.exceptions.TimeoutError as e: # explicit http timeouts in HttpCrawler
last_exception = e
log.explain_topic(f"Retrying operation {name}. Retries left: {attempts - 1 - round}")
log.explain(f"Last exception: {last_exception!r}")
if last_exception:
message = f"Error in I/O Operation: {last_exception}"
message = f"Error in I/O Operation: {last_exception!r}"
if failure_is_error:
raise CrawlError(message) from last_exception
else:

View File

@ -12,6 +12,13 @@ _STYLE_TAG_CONTENT = """
font-weight: bold;
}
.row-flex {
display: flex;
}
.row-flex-wrap {
flex-wrap: wrap;
}
.accordion-head {
background-color: #f5f7fa;
padding: 0.5rem 0;
@ -85,6 +92,11 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup:
if isinstance(type(children[0]), Comment):
dummy.decompose()
# Delete video figures, as they can not be internalized anyway
for video in soup.select(".ilc_media_cont_MediaContainerHighlighted .ilPageVideo"):
if figure := video.find_parent("figure"):
figure.decompose()
for hrule_imposter in soup.find_all(class_="ilc_section_Separator"):
hrule_imposter.insert(0, soup.new_tag("hr"))

View File

@ -23,10 +23,16 @@ from .file_templates import Links, learning_module_template
from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
IliasPageElement, _sanitize_path_name, parse_ilias_forum_export)
from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int]
class LoginTypeLocal:
def __init__(self, client_id: str):
self.client_id = client_id
class IliasWebCrawlerSection(HttpCrawlerSection):
def base_url(self) -> str:
base_url = self.s.get("base_url")
@ -35,12 +41,30 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
return base_url
def client_id(self) -> str:
client_id = self.s.get("client_id")
if not client_id:
self.missing_value("client_id")
def login(self) -> Union[Literal["shibboleth"], LoginTypeLocal]:
login_type = self.s.get("login_type")
if not login_type:
self.missing_value("login_type")
if login_type == "shibboleth":
return "shibboleth"
if login_type == "local":
client_id = self.s.get("client_id")
if not client_id:
self.missing_value("client_id")
return LoginTypeLocal(client_id)
return client_id
self.invalid_value("login_type", login_type, "Should be <shibboleth | local>")
def tfa_auth(
self, authenticators: Dict[str, Authenticator]
) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
auth = authenticators.get(value)
if auth is None:
self.invalid_value("tfa_auth", value, "No such auth section exists")
return auth
def target(self) -> TargetType:
target = self.s.get("target")
@ -81,23 +105,24 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
_DIRECTORY_PAGES: Set[IliasElementType] = {
IliasElementType.COURSE,
IliasElementType.EXERCISE,
IliasElementType.EXERCISE_FILES,
IliasElementType.FOLDER,
IliasElementType.INFO_TAB,
IliasElementType.MEETING,
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.MEETING,
IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
}
_VIDEO_ELEMENTS: Set[IliasElementType] = {
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.MEDIACAST_VIDEO,
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO,
IliasElementType.OPENCAST_VIDEO_PLAYER,
IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
IliasElementType.OPENCAST_VIDEO_PLAYER,
}
@ -155,7 +180,13 @@ instance's greatest bottleneck.
self._auth = auth
self._base_url = section.base_url()
self._client_id = section.client_id()
self._tfa_auth = section.tfa_auth(authenticators)
self._login_type = section.login()
if isinstance(self._login_type, LoginTypeLocal):
self._client_id = self._login_type.client_id
else:
self._shibboleth_login = ShibbolethLogin(self._base_url, self._auth, self._tfa_auth)
self._target = section.target()
self._link_file_redirect_delay = section.link_redirect_delay()
@ -178,94 +209,43 @@ instance's greatest bottleneck.
async def _crawl_course(self, course_id: int) -> None:
# Start crawling at the given course
root_url = url_set_query_param(
urljoin(self._base_url, "/goto.php"),
urljoin(self._base_url + "/", "goto.php"),
"target", f"crs_{course_id}",
)
await self._crawl_url(root_url, expected_id=course_id)
async def _crawl_desktop(self) -> None:
appendix = r"ILIAS\Repository\Provider\RepositoryMainBarProvider|mm_pd_sel_items"
appendix = appendix.encode("ASCII").hex()
await self._crawl_url(url_set_query_param(
urljoin(self._base_url, "/gs_content.php"),
"item=", appendix,
))
await self._crawl_url(
urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show")
)
async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None:
maybe_cl = await self.crawl(PurePath("."))
if not maybe_cl:
return
cl = maybe_cl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
description: List[BeautifulSoup] = []
@_iorepeat(3, "crawling url")
async def gather_elements() -> None:
elements.clear()
async with cl:
next_stage_url: Optional[str] = url
current_parent = None
# Duplicated code, but the root page is special - we want to avoid fetching it twice!
while next_stage_url:
soup = await self._get_page(next_stage_url, root_page_allowed=True)
if current_parent is None and expected_id is not None:
perma_link = IliasPage.get_soup_permalink(soup)
if not perma_link or "crs_" not in perma_link:
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
next_stage_url = next_element.url
else:
next_stage_url = None
elements.extend(page.get_child_elements())
if info_tab := page.get_info_tab():
elements.append(info_tab)
if description_string := page.get_description():
description.append(description_string)
# Fill up our task list with the found elements
await gather_elements()
if description:
await self._download_description(PurePath("."), description[0])
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(PurePath("."), element):
tasks.append(asyncio.create_task(handle))
# And execute them
await self.gather(tasks)
if awaitable := await self._handle_ilias_page(url, None, PurePath("."), expected_id):
await awaitable
async def _handle_ilias_page(
self,
url: str,
parent: IliasPageElement,
current_element: Optional[IliasPageElement],
path: PurePath,
expected_course_id: Optional[int] = None,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(path)
if not maybe_cl:
return None
return self._crawl_ilias_page(url, parent, maybe_cl)
if current_element:
self._ensure_not_seen(current_element, path)
return self._crawl_ilias_page(url, current_element, maybe_cl, expected_course_id)
@anoncritical
async def _crawl_ilias_page(
self,
url: str,
parent: IliasPageElement,
current_element: Optional[IliasPageElement],
cl: CrawlToken,
expected_course_id: Optional[int] = None,
) -> None:
elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
@ -276,12 +256,21 @@ instance's greatest bottleneck.
elements.clear()
async with cl:
next_stage_url: Optional[str] = url
current_parent = parent
current_parent = current_element
while next_stage_url:
soup = await self._get_page(next_stage_url)
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
# If we expect to find a root course, enforce it
if current_parent is None and expected_course_id is not None:
perma_link = IliasPage.get_soup_permalink(soup)
if not perma_link or "crs_" not in perma_link:
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
if str(expected_course_id) not in perma_link:
raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}")
page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
@ -320,14 +309,6 @@ instance's greatest bottleneck.
parent_path: PurePath,
element: IliasPageElement,
) -> Optional[Coroutine[Any, Any, None]]:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. "
+ f"First path: {fmt_path(self._visited_urls[element.url])}. "
+ f"Second path: {fmt_path(parent_path)}."
)
self._visited_urls[element.url] = parent_path
# element.name might contain `/` if the crawler created nested elements,
# so we can not sanitize it here. We trust in the output dir to thwart worst-case
# directory escape attacks.
@ -391,6 +372,8 @@ instance's greatest bottleneck.
return await self._handle_opencast_video(element, element_path)
elif element.type == IliasElementType.MEDIACAST_VIDEO:
return await self._handle_file(element, element_path)
elif element.type == IliasElementType.MOB_VIDEO:
return await self._handle_file(element, element_path, is_video=True)
elif element.type in _DIRECTORY_PAGES:
return await self._handle_ilias_page(element.url, element, element_path)
else:
@ -466,6 +449,8 @@ instance's greatest bottleneck.
if not maybe_dl:
return None
self._ensure_not_seen(element, element_path)
return self._download_booking(element, link_template_maybe, maybe_dl)
@anoncritical
@ -478,6 +463,7 @@ instance's greatest bottleneck.
async with dl as (bar, sink):
description = clean(insert_base_markup(description))
description = await self.internalize_images(description)
sink.file.write(description.prettify().encode("utf-8"))
sink.done()
@ -493,17 +479,27 @@ instance's greatest bottleneck.
self._write_link_content(link_template, element.url, element.name, element.description, sink)
async def _resolve_link_target(self, export_url: str) -> str:
async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()).select_one("a").get("href").strip()
async def impl() -> Optional[str]:
async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()).select_one("a").get("href").strip()
# We are either unauthenticated or the link is not active
new_url = resp.headers[hdrs.LOCATION].lower()
if "baseclass=illinkresourcehandlergui" in new_url and "cmd=infoscreen" in new_url:
return ""
return None
await self._authenticate()
auth_id = await self._current_auth_id()
target = await impl()
if target is not None:
return target
async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()).select_one("a").get("href").strip()
await self.authenticate(auth_id)
target = await impl()
if target is not None:
return target
raise CrawlError("resolve_link_target failed even after authenticating")
@ -530,6 +526,8 @@ instance's greatest bottleneck.
if not maybe_dl:
return None
self._ensure_not_seen(element, element_path)
# If we have every file from the cached mapping already, we can ignore this and bail
if self._all_opencast_videos_locally_present(element, maybe_dl.path):
# Mark all existing videos as known to ensure they do not get deleted during cleanup.
@ -623,18 +621,21 @@ instance's greatest bottleneck.
self,
element: IliasPageElement,
element_path: PurePath,
is_video: bool = False,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl:
return None
return self._download_file(element, maybe_dl)
self._ensure_not_seen(element, element_path)
return self._download_file(element, maybe_dl, is_video)
@_iorepeat(3, "downloading file")
@anoncritical
async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None:
async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None:
assert dl # The function is only reached when dl is not None
async with dl as (bar, sink):
await self._stream_from_url(element.url, sink, bar, is_video=False)
await self._stream_from_url(element.url, sink, bar, is_video)
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None:
async def try_stream() -> bool:
@ -663,6 +664,13 @@ instance's greatest bottleneck.
if is_video and "html" in resp.content_type:
return False
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range
if content_range := resp.headers.get(hdrs.CONTENT_RANGE, default=None):
parts = content_range.split("/")
if len(parts) == 2 and parts[1].isdigit():
bar.set_total(int(parts[1]))
# Prefer the content length header
if resp.content_length:
bar.set_total(resp.content_length)
@ -742,7 +750,8 @@ instance's greatest bottleneck.
return
async with maybe_dl as (bar, sink):
content = element.title_tag.prettify()
content = "<!DOCTYPE html>\n"
content += element.title_tag.prettify()
content += element.content_tag.prettify()
sink.file.write(content.encode("utf-8"))
sink.done()
@ -755,6 +764,8 @@ instance's greatest bottleneck.
maybe_cl = await self.crawl(element_path)
if not maybe_cl:
return None
self._ensure_not_seen(element, element_path)
return self._crawl_learning_module(element, maybe_cl)
@_iorepeat(3, "crawling learning module")
@ -877,6 +888,15 @@ instance's greatest bottleneck.
elem.attrs["src"] = "https:" + elem.attrs["src"]
return tag
def _ensure_not_seen(self, element: IliasPageElement, parent_path: PurePath) -> None:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. "
+ f"First path: {fmt_path(self._visited_urls[element.url])}. "
+ f"Second path: {fmt_path(parent_path)}."
)
self._visited_urls[element.url] = parent_path
async def _get_page(self, url: str, root_page_allowed: bool = False) -> BeautifulSoup:
auth_id = await self._current_auth_id()
async with self.session.get(url) as request:
@ -947,38 +967,39 @@ instance's greatest bottleneck.
return await request.read()
raise CrawlError("get_authenticated failed even after authenticating")
# ToDo: Is iorepeat still required?
@_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None:
# fill the session with the correct cookies
params = {
"client_id": self._client_id,
"cmd": "force_login",
}
async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request:
login_page = soupify(await request.read())
if self._login_type == "shibboleth":
await self._shibboleth_login.login(self.session)
else:
params = {
"client_id": self._client_id,
"cmd": "force_login",
}
async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request:
login_page = soupify(await request.read())
login_form = login_page.find("form", attrs={"name": "formlogin"})
if login_form is None:
raise CrawlError("Could not find the login form! Specified client id might be invalid.")
login_form = login_page.find("form", attrs={"name": "formlogin"})
if login_form is None:
raise CrawlError("Could not find the login form! Specified client id might be invalid.")
login_url = login_form.attrs.get("action")
if login_url is None:
raise CrawlError("Could not find the action URL in the login form!")
login_url = login_form.attrs.get("action")
if login_url is None:
raise CrawlError("Could not find the action URL in the login form!")
username, password = await self._auth.credentials()
username, password = await self._auth.credentials()
login_data = {
"username": username,
"password": password,
"cmd[doStandardAuthentication]": "Login",
}
login_data = {
"username": username,
"password": password,
"cmd[doStandardAuthentication]": "Login",
}
# do the actual login
async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request:
soup = soupify(await request.read())
if not self._is_logged_in(soup):
self._auth.invalidate_credentials()
# do the actual login
async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request:
soup = soupify(await request.read())
if not self._is_logged_in(soup):
self._auth.invalidate_credentials()
@staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool:

View File

@ -15,25 +15,27 @@ TargetType = Union[str, int]
class IliasElementType(Enum):
BOOKING = "booking"
COURSE = "course"
EXERCISE = "exercise"
EXERCISE_FILES = "exercise_files" # own submitted files
TEST = "test" # an online test. Will be ignored currently.
FILE = "file"
FOLDER = "folder"
FORUM = "forum"
LINK = "link"
INFO_TAB = "info_tab"
LEARNING_MODULE = "learning_module"
BOOKING = "booking"
MEETING = "meeting"
SURVEY = "survey"
SCORM_LEARNING_MODULE = "scorm_learning_module"
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder"
LINK = "link"
MEDIACAST_VIDEO = "mediacast_video"
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder"
MEETING = "meeting"
MOB_VIDEO = "mob_video"
OPENCAST_VIDEO = "opencast_video"
OPENCAST_VIDEO_PLAYER = "opencast_video_player"
OPENCAST_VIDEO_FOLDER = "opencast_video_folder"
OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated"
OPENCAST_VIDEO_PLAYER = "opencast_video_player"
SCORM_LEARNING_MODULE = "scorm_learning_module"
SURVEY = "survey"
TEST = "test" # an online test. Will be ignored currently.
@dataclass
@ -322,7 +324,7 @@ class IliasPage:
return False
def _is_personal_desktop(self) -> bool:
return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x})
return "baseclass=ildashboardgui" in self._page_url.lower() and "&cmd=show" in self._page_url.lower()
def _is_content_page(self) -> bool:
if link := self.get_permalink():
@ -427,9 +429,14 @@ class IliasPage:
def _find_personal_desktop_entries(self) -> List[IliasPageElement]:
items: List[IliasPageElement] = []
titles: List[Tag] = self._soup.select(".il-item-title")
titles: List[Tag] = self._soup.select("#block_pditems_0 .il-item-title")
for title in titles:
link = title.find("a")
if not link:
log.explain(f"Skipping offline item: {title.getText().strip()!r}")
continue
name = _sanitize_path_name(link.text.strip())
url = self._abs_url_from_link(link)
@ -739,6 +746,7 @@ class IliasPage:
result += self._find_cards()
result += self._find_mediacast_videos()
result += self._find_mob_videos()
return result
@ -767,6 +775,37 @@ class IliasPage:
return videos
def _find_mob_videos(self) -> List[IliasPageElement]:
videos: List[IliasPageElement] = []
for figure in self._soup.select("figure.ilc_media_cont_MediaContainerHighlighted"):
title = figure.select_one("figcaption").getText().strip() + ".mp4"
video_element = figure.select_one("video")
if not video_element:
_unexpected_html_warning()
log.warn_contd(f"No <video> element found for mob video '{title}'")
continue
url = None
for source in video_element.select("source"):
if source.get("type", "") == "video/mp4":
url = source.get("src")
break
if url is None:
_unexpected_html_warning()
log.warn_contd(f"No <source> element found for mob video '{title}'")
continue
videos.append(IliasPageElement.create_new(
typ=IliasElementType.MOB_VIDEO,
url=self._abs_url_from_relative(url),
name=_sanitize_path_name(title),
mtime=None
))
return videos
def _find_mediacast_video_mtime(self, enclosing_td: Tag) -> Optional[datetime]:
description_td: Tag = enclosing_td.findPreviousSibling("td")
if not description_td:
@ -959,10 +998,14 @@ class IliasPage:
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
if "exc" in icon["class"]:
return IliasElementType.EXERCISE
if "grp" in icon["class"]:
return IliasElementType.FOLDER
if "webr" in icon["class"]:
return IliasElementType.LINK
if "book" in icon["class"]:
return IliasElementType.BOOKING
if "crsr" in icon["class"]:
return IliasElementType.COURSE
if "frm" in icon["class"]:
return IliasElementType.FORUM
if "sess" in icon["class"]:

View File

@ -1,23 +1,14 @@
from typing import Any, Dict, Optional, Union
from typing import Dict, Literal
import aiohttp
import yarl
from bs4 import BeautifulSoup
from ...auth import Authenticator, TfaAuthenticator
from ...auth import Authenticator
from ...config import Config
from ...logging import log
from ...utils import soupify
from ..crawler import CrawlError, CrawlWarning
from .async_helper import _iorepeat
from .ilias_web_crawler import IliasWebCrawler, IliasWebCrawlerSection
TargetType = Union[str, int]
from .shibboleth_login import ShibbolethLogin
_ILIAS_URL = "https://ilias.studium.kit.edu"
class KitShibbolethBackgroundLoginSuccessful():
class KitShibbolethBackgroundLoginSuccessful:
pass
@ -25,19 +16,8 @@ class KitIliasWebCrawlerSection(IliasWebCrawlerSection):
def base_url(self) -> str:
return _ILIAS_URL
def client_id(self) -> str:
# KIT ILIAS uses the Shibboleth service for authentication. There's no
# use for a client id.
return "unused"
def tfa_auth(self, authenticators: Dict[str, Authenticator]) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
auth = authenticators.get(value)
if auth is None:
self.invalid_value("tfa_auth", value, "No such auth section exists")
return auth
def login(self) -> Literal["shibboleth"]:
return "shibboleth"
class KitIliasWebCrawler(IliasWebCrawler):
@ -46,184 +26,12 @@ class KitIliasWebCrawler(IliasWebCrawler):
name: str,
section: KitIliasWebCrawlerSection,
config: Config,
authenticators: Dict[str, Authenticator]
authenticators: Dict[str, Authenticator],
):
super().__init__(name, section, config, authenticators)
self._shibboleth_login = KitShibbolethLogin(
self._shibboleth_login = ShibbolethLogin(
_ILIAS_URL,
self._auth,
section.tfa_auth(authenticators),
)
# We repeat this as the login method in shibboleth doesn't handle I/O errors.
# Shibboleth is quite reliable as well, the repeat is likely not critical here.
@_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session)
class KitShibbolethLogin:
"""
Login via KIT's shibboleth system.
"""
def __init__(self, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]) -> None:
self._auth = authenticator
self._tfa_auth = tfa_authenticator
async def login(self, sess: aiohttp.ClientSession) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
url = f"{_ILIAS_URL}/shib_login.php"
data = {
"sendLogin": "1",
"idp_selection": "https://idp.scc.kit.edu/idp/shibboleth",
"il_target": "",
"home_organization_selection": "Weiter",
}
soup: Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful] = await _shib_post(sess, url, data)
if isinstance(soup, KitShibbolethBackgroundLoginSuccessful):
return
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"class": "full content", "method": "post"})
action = form["action"]
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = "https://idp.scc.kit.edu" + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
"csrf_token": csrf_token
}
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup)
if not self._login_successful(soup):
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = f"{_ILIAS_URL}/Shibboleth.sso/SAML2/POST"
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
await sess.post(url, data=data)
async def _authenticate_tfa(
self,
session: aiohttp.ClientSession,
soup: BeautifulSoup
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
tfa_token = await self._tfa_auth.password()
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = "https://idp.scc.kit.edu" + action
data = {
"_eventId_proceed": "",
"j_tokenNumber": tfa_token,
"csrf_token": csrf_token
}
return await _post(session, url, data)
@staticmethod
def _login_successful(soup: BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="j_tokenNumber") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())
async def _shib_post(
session: aiohttp.ClientSession,
url: str,
data: Any
) -> Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful]:
"""
aiohttp unescapes '/' and ':' in URL query parameters which is not RFC compliant and rejected
by Shibboleth. Thanks a lot. So now we unroll the requests manually, parse location headers and
build encoded URL objects ourselves... Who thought mangling location header was a good idea??
"""
log.explain_topic("Shib login POST")
async with session.post(url, data=data, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Got location {location!r}")
if not location:
raise CrawlWarning(f"Login failed (1), no location header present at {url}")
correct_url = yarl.URL(location, encoded=True)
log.explain(f"Corrected location to {correct_url!r}")
if str(correct_url).startswith(_ILIAS_URL):
log.explain("ILIAS recognized our shib token and logged us in in the background, returning")
return KitShibbolethBackgroundLoginSuccessful()
async with session.get(correct_url, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Redirected to {location!r} with status {response.status}")
# If shib still has a valid session, it will directly respond to the request
if location is None:
log.explain("Shib recognized us, returning its response directly")
return soupify(await response.read())
as_yarl = yarl.URL(response.url)
# Probably not needed anymore, but might catch a few weird situations with a nicer message
if not location or not as_yarl.host:
raise CrawlWarning(f"Login failed (2), no location header present at {correct_url}")
correct_url = yarl.URL.build(
scheme=as_yarl.scheme,
host=as_yarl.host,
path=location,
encoded=True
)
log.explain(f"Corrected location to {correct_url!r}")
async with session.get(correct_url, allow_redirects=False) as response:
return soupify(await response.read())

View File

@ -0,0 +1,128 @@
from typing import Any, Optional
import aiohttp
import yarl
from bs4 import BeautifulSoup
from ...auth import Authenticator, TfaAuthenticator
from ...logging import log
from ...utils import soupify
from ..crawler import CrawlError
class ShibbolethLogin:
"""
Login via shibboleth system.
"""
def __init__(
self, ilias_url: str, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]
) -> None:
self._ilias_url = ilias_url
self._auth = authenticator
self._tfa_auth = tfa_authenticator
async def login(self, sess: aiohttp.ClientSession) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
url = f"{self._ilias_url}/shib_login.php"
async with sess.get(url) as response:
shib_url = response.url
if str(shib_url).startswith(self._ilias_url):
log.explain(
"ILIAS recognized our shib token and logged us in in the background, returning"
)
return
soup: BeautifulSoup = soupify(await response.read())
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"]
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup, shib_url)
if not self._login_successful(soup):
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = form = soup.find("form", {"method": "post"})["action"]
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
await sess.post(url, data=data)
async def _authenticate_tfa(
self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
tfa_token = await self._tfa_auth.password()
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_tokenNumber": tfa_token,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"]
return await _post(session, url, data)
@staticmethod
def _login_successful(soup: BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="j_tokenNumber") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())

View File

@ -1,8 +1,9 @@
import os
import re
from dataclasses import dataclass
from datetime import datetime
from pathlib import PurePath
from typing import Awaitable, List, Optional, Pattern, Set, Tuple, Union
from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union
from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag
@ -31,24 +32,24 @@ class KitIpdCrawlerSection(HttpCrawlerSection):
return re.compile(regex)
@dataclass(unsafe_hash=True)
@dataclass
class KitIpdFile:
name: str
url: str
def explain(self) -> None:
log.explain(f"File {self.name!r} (href={self.url!r})")
@dataclass
class KitIpdFolder:
name: str
files: List[KitIpdFile]
entries: List[Union[KitIpdFile, "KitIpdFolder"]]
def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}")
for file in self.files:
log.explain(f"File {file.name!r} (href={file.url!r})")
def __hash__(self) -> int:
return self.name.__hash__()
for entry in self.entries:
entry.explain()
class KitIpdCrawler(HttpCrawler):
@ -72,68 +73,83 @@ class KitIpdCrawler(HttpCrawler):
async with maybe_cl:
for item in await self._fetch_items():
item.explain()
if isinstance(item, KitIpdFolder):
tasks.append(self._crawl_folder(item))
tasks.append(self._crawl_folder(PurePath("."), item))
else:
# Orphan files are placed in the root folder
tasks.append(self._download_file(PurePath("."), item))
log.explain_topic(f"Orphan file {item.name!r} (href={item.url!r})")
log.explain("Attributing it to root folder")
# do this here to at least be sequential and not parallel (rate limiting is hard, as the
# crawl abstraction does not hold for these requests)
etag, mtime = await self._request_resource_version(item.url)
tasks.append(self._download_file(PurePath("."), item, etag, mtime))
await self.gather(tasks)
async def _crawl_folder(self, folder: KitIpdFolder) -> None:
path = PurePath(folder.name)
async def _crawl_folder(self, parent: PurePath, folder: KitIpdFolder) -> None:
path = parent / folder.name
if not await self.crawl(path):
return
tasks = [self._download_file(path, file) for file in folder.files]
tasks = []
for entry in folder.entries:
if isinstance(entry, KitIpdFolder):
tasks.append(self._crawl_folder(path, entry))
else:
# do this here to at least be sequential and not parallel (rate limiting is hard, as the crawl
# abstraction does not hold for these requests)
etag, mtime = await self._request_resource_version(entry.url)
tasks.append(self._download_file(path, entry, etag, mtime))
await self.gather(tasks)
async def _download_file(self, parent: PurePath, file: KitIpdFile) -> None:
async def _download_file(
self,
parent: PurePath,
file: KitIpdFile,
etag: Optional[str],
mtime: Optional[datetime]
) -> None:
element_path = parent / file.name
maybe_dl = await self.download(element_path)
prev_etag = self._get_previous_etag_from_report(element_path)
etag_differs = None if prev_etag is None else prev_etag != etag
maybe_dl = await self.download(element_path, etag_differs=etag_differs, mtime=mtime)
if not maybe_dl:
# keep storing the known file's etag
if prev_etag:
self._add_etag_to_report(element_path, prev_etag)
return
async with maybe_dl as (bar, sink):
await self._stream_from_url(file.url, sink, bar)
await self._stream_from_url(file.url, element_path, sink, bar)
async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]:
async def _fetch_items(self) -> Iterable[Union[KitIpdFile, KitIpdFolder]]:
page, url = await self.get_page()
elements: List[Tag] = self._find_file_links(page)
items: Set[Union[KitIpdFile, KitIpdFolder]] = set()
# do not add unnecessary nesting for a single <h1> heading
drop_h1: bool = len(page.find_all(name="h1")) <= 1
folder_tree: KitIpdFolder = KitIpdFolder(".", [])
for element in elements:
folder_label = self._find_folder_label(element)
if folder_label:
folder = self._extract_folder(folder_label, url)
if folder not in items:
items.add(folder)
folder.explain()
else:
file = self._extract_file(element, url)
items.add(file)
log.explain_topic(f"Orphan file {file.name!r} (href={file.url!r})")
log.explain("Attributing it to root folder")
parent = HttpCrawler.get_folder_structure_from_heading_hierarchy(element, drop_h1)
file = self._extract_file(element, url)
return items
current_folder: KitIpdFolder = folder_tree
for folder_name in parent.parts:
# helps the type checker to verify that current_folder is indeed a folder
def subfolders() -> Generator[KitIpdFolder, Any, None]:
return (entry for entry in current_folder.entries if isinstance(entry, KitIpdFolder))
def _extract_folder(self, folder_tag: Tag, url: str) -> KitIpdFolder:
files: List[KitIpdFile] = []
name = folder_tag.getText().strip()
if not any(entry.name == folder_name for entry in subfolders()):
current_folder.entries.append(KitIpdFolder(folder_name, []))
current_folder = next(entry for entry in subfolders() if entry.name == folder_name)
container: Tag = folder_tag.findNextSibling(name="table")
for link in self._find_file_links(container):
files.append(self._extract_file(link, url))
current_folder.entries.append(file)
return KitIpdFolder(name, files)
@staticmethod
def _find_folder_label(file_link: Tag) -> Optional[Tag]:
enclosing_table: Tag = file_link.findParent(name="table")
if enclosing_table is None:
return None
return enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$"))
return folder_tree.entries
def _extract_file(self, link: Tag, url: str) -> KitIpdFile:
url = self._abs_url_from_link(url, link)
@ -146,7 +162,7 @@ class KitIpdCrawler(HttpCrawler):
def _abs_url_from_link(self, url: str, link_tag: Tag) -> str:
return urljoin(url, link_tag.get("href"))
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar) -> None:
async def _stream_from_url(self, url: str, path: PurePath, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp:
if resp.status == 403:
raise CrawlError("Received a 403. Are you within the KIT network/VPN?")
@ -159,6 +175,8 @@ class KitIpdCrawler(HttpCrawler):
sink.done()
self._add_etag_to_report(path, resp.headers.get("ETag"))
async def get_page(self) -> Tuple[BeautifulSoup, str]:
async with self.session.get(self._url) as request:
# The web page for Algorithmen für Routenplanung contains some

View File

@ -57,6 +57,7 @@ class OnConflict(Enum):
@dataclass
class Heuristics:
etag_differs: Optional[bool]
mtime: Optional[datetime]
@ -233,8 +234,16 @@ class OutputDirectory:
remote_newer = None
# ETag should be a more reliable indicator than mtime, so we check it first
if heuristics.etag_differs is not None:
remote_newer = heuristics.etag_differs
if remote_newer:
log.explain("Remote file's entity tag differs")
else:
log.explain("Remote file's entity tag is the same")
# Python on Windows crashes when faced with timestamps around the unix epoch
if heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970):
if remote_newer is None and heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970):
mtime = heuristics.mtime
remote_newer = mtime.timestamp() > stat.st_mtime
if remote_newer:
@ -366,6 +375,8 @@ class OutputDirectory:
self,
remote_path: PurePath,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
@ -375,7 +386,7 @@ class OutputDirectory:
MarkConflictError.
"""
heuristics = Heuristics(mtime)
heuristics = Heuristics(etag_differs, mtime)
redownload = self._redownload if redownload is None else redownload
on_conflict = self._on_conflict if on_conflict is None else on_conflict
local_path = self.resolve(path)

View File

@ -1,5 +1,6 @@
from pathlib import Path
from pathlib import Path, PurePath
from typing import Dict, List, Optional
from urllib.parse import quote
from rich.markup import escape
@ -168,19 +169,26 @@ class Pferd:
log.report("")
log.report(f"[bold bright_cyan]Report[/] for {escape(name)}")
def fmt_path_link(relative_path: PurePath) -> str:
# We need to URL-encode the path because it might contain spaces or special characters
absolute_path = str(crawler.output_dir.resolve(relative_path).absolute())
absolute_path = absolute_path.replace("\\\\?\\", "")
link = f"file://{quote(absolute_path)}"
return f"[link={link}]{fmt_path(relative_path)}[/link]"
something_changed = False
for path in sorted(crawler.report.added_files):
something_changed = True
log.report(f" [bold bright_green]Added[/] {fmt_path(path)}")
log.report(f" [bold bright_green]Added[/] {fmt_path_link(path)}")
for path in sorted(crawler.report.changed_files):
something_changed = True
log.report(f" [bold bright_yellow]Changed[/] {fmt_path(path)}")
log.report(f" [bold bright_yellow]Changed[/] {fmt_path_link(path)}")
for path in sorted(crawler.report.deleted_files):
something_changed = True
log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}")
for path in sorted(crawler.report.not_deleted_files):
something_changed = True
log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path_link(path)}")
for warning in crawler.report.encountered_warnings:
something_changed = True

View File

@ -1,2 +1,2 @@
NAME = "PFERD"
VERSION = "3.6.0"
VERSION = "3.7.0"