Compare commits

..

10 Commits

8 changed files with 163 additions and 129 deletions

View File

@ -22,6 +22,20 @@ ambiguous situations.
## Unreleased ## Unreleased
## 3.5.2 - 2024-04-14
### Fixed
- Crawling of personal desktop with ILIAS 8
- Crawling of empty personal desktops
## 3.5.1 - 2024-04-09
### Added
- Support for ILIAS 8
### Fixed
- Video name deduplication
## 3.5.0 - 2023-09-13 ## 3.5.0 - 2023-09-13
### Added ### Added

View File

@ -92,9 +92,6 @@ common to all crawlers:
load for the crawl target. (Default: `0.0`) load for the crawl target. (Default: `0.0`)
- `windows_paths`: Whether PFERD should find alternative names for paths that - `windows_paths`: Whether PFERD should find alternative names for paths that
are invalid on Windows. (Default: `yes` on Windows, `no` otherwise) are invalid on Windows. (Default: `yes` on Windows, `no` otherwise)
- `aliases`: List of strings that are considered as an alias when invoking with
the `--crawler` or `-C` flag. If there is more than one crawl section with
the same aliases all are selected. Thereby, you can group different crawlers.
Some crawlers may also require credentials for authentication. To configure how Some crawlers may also require credentials for authentication. To configure how
the crawler obtains its credentials, the `auth` option is used. It is set to the the crawler obtains its credentials, the `auth` option is used. It is set to the
@ -109,7 +106,6 @@ username = foo
password = bar password = bar
[crawl:something] [crawl:something]
aliases = [sth, some]
type = some-complex-crawler type = some-complex-crawler
auth = auth:example auth = auth:example
on_conflict = no-delete on_conflict = no-delete

View File

@ -17,7 +17,7 @@ TargetType = Union[str, int]
class IliasElementType(Enum): class IliasElementType(Enum):
EXERCISE = "exercise" EXERCISE = "exercise"
EXERCISE_FILES = "exercise_files" # own submitted files EXERCISE_FILES = "exercise_files" # own submitted files
TEST = "test" # an online test. Will be ignored currently. TEST = "test" # an online test. Will be ignored currently.
FILE = "file" FILE = "file"
FOLDER = "folder" FOLDER = "folder"
FORUM = "forum" FORUM = "forum"
@ -95,13 +95,9 @@ class IliasPage:
@staticmethod @staticmethod
def is_root_page(soup: BeautifulSoup) -> bool: def is_root_page(soup: BeautifulSoup) -> bool:
permalink = soup.find(id="current_perma_link") if permalink := IliasPage.get_soup_permalink(soup):
if permalink is None: return "goto.php?target=root_" in permalink
return False return False
value = permalink.attrs.get("value")
if value is None:
return False
return "goto.php?target=root_" in value
def get_child_elements(self) -> List[IliasPageElement]: def get_child_elements(self) -> List[IliasPageElement]:
""" """
@ -279,16 +275,14 @@ class IliasPage:
return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}) return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x})
def _is_content_page(self) -> bool: def _is_content_page(self) -> bool:
link = self._soup.find(id="current_perma_link") if link := self.get_permalink():
if not link: return "target=copa_" in link
return False return False
return "target=copa_" in link.get("value")
def _is_learning_module_page(self) -> bool: def _is_learning_module_page(self) -> bool:
link = self._soup.find(id="current_perma_link") if link := self.get_permalink():
if not link: return "target=pg_" in link
return False return False
return "target=pg_" in link.get("value")
def _contains_collapsed_future_meetings(self) -> bool: def _contains_collapsed_future_meetings(self) -> bool:
return self._uncollapse_future_meetings_url() is not None return self._uncollapse_future_meetings_url() is not None
@ -384,6 +378,10 @@ class IliasPage:
name = _sanitize_path_name(link.text.strip()) name = _sanitize_path_name(link.text.strip())
url = self._abs_url_from_link(link) url = self._abs_url_from_link(link)
if "cmd=manage" in url and "cmdClass=ilPDSelectedItemsBlockGUI" in url:
# Configure button/link does not have anything interesting
continue
type = self._find_type_from_link(name, link, url) type = self._find_type_from_link(name, link, url)
if not type: if not type:
_unexpected_html_warning() _unexpected_html_warning()
@ -513,8 +511,8 @@ class IliasPage:
modification_string = link.parent.parent.parent.select_one( modification_string = link.parent.parent.parent.select_one(
f"td.std:nth-child({index})" f"td.std:nth-child({index})"
).getText().strip() ).getText().strip()
if re.search(r"\d+\.\d+.\d+ - \d+:\d+", modification_string): if match := re.search(r"\d+\.\d+.\d+ \d+:\d+", modification_string):
modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M") modification_time = datetime.strptime(match.group(0), "%d.%m.%Y %H:%M")
break break
if modification_time is None: if modification_time is None:
@ -613,7 +611,7 @@ class IliasPage:
file_listings: List[Tag] = container.findAll( file_listings: List[Tag] = container.findAll(
name="a", name="a",
# download links contain the given command class # download links contain the given command class
attrs={"href": lambda x: x and "cmdClass=ilexsubmissionfilegui" in x} attrs={"href": lambda x: x and "cmdclass=ilexsubmissionfilegui" in x.lower()}
) )
# Add each listing as a new # Add each listing as a new
@ -917,9 +915,9 @@ class IliasPage:
@staticmethod @staticmethod
def _find_type_from_link( def _find_type_from_link(
element_name: str, element_name: str,
link_element: Tag, link_element: Tag,
url: str url: str
) -> Optional[IliasElementType]: ) -> Optional[IliasElementType]:
""" """
Decides which sub crawler to use for a given top level element. Decides which sub crawler to use for a given top level element.
@ -1080,6 +1078,14 @@ class IliasPage:
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}): if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True return True
# Empty personal desktop has zero (0) markers. Match on the text...
if alert := soup.select_one(".alert-info"):
text = alert.getText().lower()
if "you have not yet selected any favourites" in text:
return True
if "sie haben aktuell noch keine favoriten ausgewählt" in text:
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by # Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table # their video listing table
video_table = soup.find( video_table = soup.find(
@ -1095,6 +1101,9 @@ class IliasPage:
return True return True
return False return False
def get_permalink(self) -> Optional[str]:
return IliasPage.get_soup_permalink(self._soup)
def _abs_url_from_link(self, link_tag: Tag) -> str: def _abs_url_from_link(self, link_tag: Tag) -> str:
""" """
Create an absolute url from an <a> tag. Create an absolute url from an <a> tag.
@ -1107,6 +1116,13 @@ class IliasPage:
""" """
return urljoin(self._page_url, relative_url) return urljoin(self._page_url, relative_url)
@staticmethod
def get_soup_permalink(soup: BeautifulSoup) -> Optional[str]:
perma_link_element: Tag = soup.select_one(".il-footer-permanent-url > a")
if not perma_link_element or not perma_link_element.get("href"):
return None
return perma_link_element.get("href")
def _unexpected_html_warning() -> None: def _unexpected_html_warning() -> None:
log.warn("Encountered unexpected HTML structure, ignoring element.") log.warn("Encountered unexpected HTML structure, ignoring element.")
@ -1130,7 +1146,7 @@ def demangle_date(date_str: str, fail_silently: bool = False) -> Optional[dateti
date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I) date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I)
date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I) date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I)
date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I) date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I)
date_str = date_str.strip() date_str = date_str.strip()
for german, english in zip(german_months, english_months): for german, english in zip(german_months, english_months):
date_str = date_str.replace(german, english) date_str = date_str.replace(german, english)

View File

@ -81,7 +81,7 @@ class KitIliasWebCrawlerSection(HttpCrawlerSection):
return self.s.getboolean("forums", fallback=False) return self.s.getboolean("forums", fallback=False)
_DIRECTORY_PAGES: Set[IliasElementType] = set([ _DIRECTORY_PAGES: Set[IliasElementType] = {
IliasElementType.EXERCISE, IliasElementType.EXERCISE,
IliasElementType.EXERCISE_FILES, IliasElementType.EXERCISE_FILES,
IliasElementType.FOLDER, IliasElementType.FOLDER,
@ -90,16 +90,16 @@ _DIRECTORY_PAGES: Set[IliasElementType] = set([
IliasElementType.MEDIACAST_VIDEO_FOLDER, IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER, IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
]) }
_VIDEO_ELEMENTS: Set[IliasElementType] = set([ _VIDEO_ELEMENTS: Set[IliasElementType] = {
IliasElementType.MEDIACAST_VIDEO_FOLDER, IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.MEDIACAST_VIDEO, IliasElementType.MEDIACAST_VIDEO,
IliasElementType.OPENCAST_VIDEO, IliasElementType.OPENCAST_VIDEO,
IliasElementType.OPENCAST_VIDEO_PLAYER, IliasElementType.OPENCAST_VIDEO_PLAYER,
IliasElementType.OPENCAST_VIDEO_FOLDER, IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
]) }
def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]: def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]:
@ -130,6 +130,7 @@ def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Calla
raise CrawlError("Impossible return in ilias _iorepeat") raise CrawlError("Impossible return in ilias _iorepeat")
return wrapper # type: ignore return wrapper # type: ignore
return decorator return decorator
@ -140,6 +141,10 @@ def _wrap_io_in_warning(name: str) -> Callable[[AWrapped], AWrapped]:
return _iorepeat(1, name) return _iorepeat(1, name)
def _get_video_cache_key(element: IliasPageElement) -> str:
return f"ilias-video-cache-{element.id()}"
# Crawler control flow: # Crawler control flow:
# #
# crawl_desktop -+ # crawl_desktop -+
@ -173,11 +178,11 @@ def _wrap_io_in_warning(name: str) -> Callable[[AWrapped], AWrapped]:
class KitIliasWebCrawler(HttpCrawler): class KitIliasWebCrawler(HttpCrawler):
def __init__( def __init__(
self, self,
name: str, name: str,
section: KitIliasWebCrawlerSection, section: KitIliasWebCrawlerSection,
config: Config, config: Config,
authenticators: Dict[str, Authenticator] authenticators: Dict[str, Authenticator]
): ):
# Setting a main authenticator for cookie sharing # Setting a main authenticator for cookie sharing
auth = section.auth(authenticators) auth = section.auth(authenticators)
@ -223,7 +228,7 @@ instance's greatest bottleneck.
await self._crawl_url(root_url, expected_id=course_id) await self._crawl_url(root_url, expected_id=course_id)
async def _crawl_desktop(self) -> None: async def _crawl_desktop(self) -> None:
appendix = r"ILIAS\PersonalDesktop\PDMainBarProvider|mm_pd_sel_items" appendix = r"ILIAS\Repository\Provider\RepositoryMainBarProvider|mm_pd_sel_items"
appendix = appendix.encode("ASCII").hex() appendix = appendix.encode("ASCII").hex()
await self._crawl_url(self._base_url + "/gs_content.php?item=" + appendix) await self._crawl_url(self._base_url + "/gs_content.php?item=" + appendix)
@ -249,8 +254,8 @@ instance's greatest bottleneck.
soup = await self._get_page(next_stage_url, root_page_allowed=True) soup = await self._get_page(next_stage_url, root_page_allowed=True)
if current_parent is None and expected_id is not None: if current_parent is None and expected_id is not None:
perma_link_element: Tag = soup.find(id="current_perma_link") perma_link = IliasPage.get_soup_permalink(soup)
if not perma_link_element or "crs_" not in perma_link_element.get("value"): if not perma_link or "crs_" not in perma_link:
raise CrawlError("Invalid course id? Didn't find anything looking like a course") raise CrawlError("Invalid course id? Didn't find anything looking like a course")
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}") log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
@ -547,8 +552,8 @@ instance's greatest bottleneck.
# Copy old mapping as it is likely still relevant # Copy old mapping as it is likely still relevant
if self.prev_report: if self.prev_report:
self.report.add_custom_value( self.report.add_custom_value(
str(element_path), _get_video_cache_key(element),
self.prev_report.get_custom_value(str(element_path)) self.prev_report.get_custom_value(_get_video_cache_key(element))
) )
# A video might contain other videos, so let's "crawl" the video first # A video might contain other videos, so let's "crawl" the video first
@ -558,58 +563,69 @@ instance's greatest bottleneck.
# to ensure backwards compatibility. # to ensure backwards compatibility.
maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.ALWAYS) maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.ALWAYS)
# If we do not want to crawl it (user filter) or we have every file # If we do not want to crawl it (user filter), we can move on
# from the cached mapping already, we can ignore this and bail if not maybe_dl:
if not maybe_dl or self._all_opencast_videos_locally_present(element_path): return None
# Mark all existing cideos as known so they do not get deleted
# during dleanup. We "downloaded" them, just without actually making # If we have every file from the cached mapping already, we can ignore this and bail
# a network request as we assumed they did not change. if self._all_opencast_videos_locally_present(element, maybe_dl.path):
for video in self._previous_contained_opencast_videos(element_path): # Mark all existing videos as known to ensure they do not get deleted during cleanup.
await self.download(video) # We "downloaded" them, just without actually making a network request as we assumed
# they did not change.
contained = self._previous_contained_opencast_videos(element, maybe_dl.path)
if len(contained) > 1:
# Only do this if we threw away the original dl token,
# to not download single-stream videos twice
for video in contained:
await self.download(video)
return None return None
return self._download_opencast_video(element_path, element, maybe_dl) return self._download_opencast_video(element, maybe_dl)
def _previous_contained_opencast_videos(self, video_path: PurePath) -> List[PurePath]: def _previous_contained_opencast_videos(
self, element: IliasPageElement, element_path: PurePath
) -> List[PurePath]:
if not self.prev_report: if not self.prev_report:
return [] return []
custom_value = self.prev_report.get_custom_value(str(video_path)) custom_value = self.prev_report.get_custom_value(_get_video_cache_key(element))
if not custom_value: if not custom_value:
return [] return []
names = cast(List[str], custom_value) cached_value = cast(dict[str, Any], custom_value)
folder = video_path.parent if "known_paths" not in cached_value or "own_path" not in cached_value:
return [PurePath(folder, name) for name in names] log.explain(f"'known_paths' or 'own_path' missing from cached value: {cached_value}")
return []
transformed_own_path = self._transformer.transform(element_path)
if cached_value["own_path"] != str(transformed_own_path):
log.explain(
f"own_path '{transformed_own_path}' does not match cached value: '{cached_value['own_path']}"
)
return []
return [PurePath(name) for name in cached_value["known_paths"]]
def _all_opencast_videos_locally_present(self, video_path: PurePath) -> bool: def _all_opencast_videos_locally_present(self, element: IliasPageElement, element_path: PurePath) -> bool:
if contained_videos := self._previous_contained_opencast_videos(video_path): log.explain_topic(f"Checking local cache for video {fmt_path(element_path)}")
log.explain_topic(f"Checking local cache for video {video_path.name}") if contained_videos := self._previous_contained_opencast_videos(element, element_path):
all_found_locally = True log.explain(
for video in contained_videos: f"The following contained videos are known: {','.join(map(fmt_path, contained_videos))}"
transformed_path = self._to_local_opencast_video_path(video) )
if transformed_path: if all(self._output_dir.resolve(path).exists() for path in contained_videos):
exists_locally = self._output_dir.resolve(transformed_path).exists() log.explain("Found all known videos locally, skipping enumeration request")
all_found_locally = all_found_locally and exists_locally
if all_found_locally:
log.explain("Found all videos locally, skipping enumeration request")
return True return True
log.explain("Missing at least one video, continuing with requests!") log.explain("Missing at least one video, continuing with requests!")
else:
log.explain("No local cache present")
return False return False
def _to_local_opencast_video_path(self, path: PurePath) -> Optional[PurePath]:
if transformed := self._transformer.transform(path):
return self._deduplicator.fixup_path(transformed)
return None
@anoncritical @anoncritical
@_iorepeat(3, "downloading video") @_iorepeat(3, "downloading video")
async def _download_opencast_video( async def _download_opencast_video(self, element: IliasPageElement, dl: DownloadToken) -> None:
self, def add_to_report(paths: list[str]) -> None:
original_path: PurePath, self.report.add_custom_value(
element: IliasPageElement, _get_video_cache_key(element),
dl: DownloadToken {"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))}
) -> None: )
stream_elements: List[IliasPageElement] = []
async with dl as (bar, sink): async with dl as (bar, sink):
page = IliasPage(await self._get_page(element.url), element.url, element) page = IliasPage(await self._get_page(element.url), element.url, element)
stream_elements = page.get_child_elements() stream_elements = page.get_child_elements()
@ -620,32 +636,25 @@ instance's greatest bottleneck.
log.explain(f"Using single video mode for {element.name}") log.explain(f"Using single video mode for {element.name}")
stream_element = stream_elements[0] stream_element = stream_elements[0]
transformed_path = self._to_local_opencast_video_path(original_path)
if not transformed_path:
raise CrawlError(f"Download returned a path but transform did not for {original_path}")
# We do not have a local cache yet # We do not have a local cache yet
if self._output_dir.resolve(transformed_path).exists(): await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
log.explain(f"Video for {element.name} existed locally") add_to_report([str(self._transformer.transform(dl.path))])
else:
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
self.report.add_custom_value(str(original_path), [original_path.name])
return return
contained_video_paths: List[str] = [] contained_video_paths: List[str] = []
for stream_element in stream_elements: for stream_element in stream_elements:
video_path = original_path.parent / stream_element.name video_path = dl.path.parent / stream_element.name
contained_video_paths.append(str(video_path))
maybe_dl = await self.download(video_path, mtime=element.mtime, redownload=Redownload.NEVER) maybe_dl = await self.download(video_path, mtime=element.mtime, redownload=Redownload.NEVER)
if not maybe_dl: if not maybe_dl:
continue continue
async with maybe_dl as (bar, sink): async with maybe_dl as (bar, sink):
log.explain(f"Streaming video from real url {stream_element.url}") log.explain(f"Streaming video from real url {stream_element.url}")
contained_video_paths.append(str(self._transformer.transform(maybe_dl.path)))
await self._stream_from_url(stream_element.url, sink, bar, is_video=True) await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
self.report.add_custom_value(str(original_path), contained_video_paths) add_to_report(contained_video_paths)
async def _handle_file( async def _handle_file(
self, self,
@ -657,8 +666,8 @@ instance's greatest bottleneck.
return None return None
return self._download_file(element, maybe_dl) return self._download_file(element, maybe_dl)
@anoncritical
@_iorepeat(3, "downloading file") @_iorepeat(3, "downloading file")
@anoncritical
async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None: async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None:
assert dl # The function is only reached when dl is not None assert dl # The function is only reached when dl is not None
async with dl as (bar, sink): async with dl as (bar, sink):
@ -666,12 +675,28 @@ instance's greatest bottleneck.
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None: async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None:
async def try_stream() -> bool: async def try_stream() -> bool:
async with self.session.get(url, allow_redirects=is_video) as resp: next_url = url
if not is_video:
# Redirect means we weren't authenticated # Normal files redirect to the magazine if we are not authenticated. As files could be HTML,
# we can not match on the content type here. Instead, we disallow redirects and inspect the
# new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume
# our authentication expired.
if not is_video:
async with self.session.get(url, allow_redirects=False) as resp:
# Redirect to anything except a "sendfile" means we weren't authenticated
if hdrs.LOCATION in resp.headers: if hdrs.LOCATION in resp.headers:
return False if "&cmd=sendfile" not in resp.headers[hdrs.LOCATION]:
# we wanted a video but got HTML return False
# Directly follow the redirect to not make a second, unnecessary request
next_url = resp.headers[hdrs.LOCATION]
# Let's try this again and follow redirects
return await fetch_follow_redirects(next_url)
async def fetch_follow_redirects(file_url: str) -> bool:
async with self.session.get(file_url) as resp:
# We wanted a video but got HTML => Forbidden, auth expired. Logging in won't really
# solve that depending on the setup, but it is better than nothing.
if is_video and "html" in resp.content_type: if is_video and "html" in resp.content_type:
return False return False
@ -728,7 +753,6 @@ instance's greatest bottleneck.
raise CrawlWarning("Failed to extract forum data") raise CrawlWarning("Failed to extract forum data")
if download_data.empty: if download_data.empty:
log.explain("Forum had no threads") log.explain("Forum had no threads")
elements = []
return return
html = await self._post_authenticated(download_data.url, download_data.form_data) html = await self._post_authenticated(download_data.url, download_data.form_data)
elements = parse_ilias_forum_export(soupify(html)) elements = parse_ilias_forum_export(soupify(html))
@ -962,7 +986,7 @@ instance's greatest bottleneck.
# We repeat this as the login method in shibboleth doesn't handle I/O errors. # We repeat this as the login method in shibboleth doesn't handle I/O errors.
# Shibboleth is quite reliable as well, the repeat is likely not critical here. # Shibboleth is quite reliable as well, the repeat is likely not critical here.
@ _iorepeat(3, "Login", failure_is_error=True) @_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None: async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session) await self._shibboleth_login.login(self.session)
@ -1045,9 +1069,9 @@ class KitShibbolethLogin:
await sess.post(url, data=data) await sess.post(url, data=data)
async def _authenticate_tfa( async def _authenticate_tfa(
self, self,
session: aiohttp.ClientSession, session: aiohttp.ClientSession,
soup: BeautifulSoup soup: BeautifulSoup
) -> BeautifulSoup: ) -> BeautifulSoup:
if not self._tfa_auth: if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa") self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
@ -1112,7 +1136,7 @@ async def _shib_post(
async with session.get(correct_url, allow_redirects=False) as response: async with session.get(correct_url, allow_redirects=False) as response:
location = response.headers.get("location") location = response.headers.get("location")
log.explain(f"Redirected to {location!r} with status {response.status}") log.explain(f"Redirected to {location!r} with status {response.status}")
# If shib still still has a valid session, it will directly respond to the request # If shib still has a valid session, it will directly respond to the request
if location is None: if location is None:
log.explain("Shib recognized us, returning its response directly") log.explain("Shib recognized us, returning its response directly")
return soupify(await response.read()) return soupify(await response.read())

View File

@ -1,5 +1,5 @@
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Set from typing import Dict, List, Optional
from rich.markup import escape from rich.markup import escape
@ -43,24 +43,16 @@ class Pferd:
crawl_sections = [name for name, _ in config.crawl_sections()] crawl_sections = [name for name, _ in config.crawl_sections()]
crawlers_to_run = set() # With crawl: prefix crawlers_to_run = [] # With crawl: prefix
unknown_names = [] # Without crawl: prefix unknown_names = [] # Without crawl: prefix
for name in cli_crawlers: for name in cli_crawlers:
section_name = f"crawl:{name}" section_name = f"crawl:{name}"
if section_name in crawl_sections: if section_name in crawl_sections:
log.explain(f"Crawler section named {section_name!r} exists") log.explain(f"Crawler section named {section_name!r} exists")
crawlers_to_run.add(section_name) crawlers_to_run.append(section_name)
# interprete name as alias of a crawler else:
alias_names = self._find_crawlers_by_alias(name, config) log.explain(f"There's no crawler section named {section_name!r}")
if alias_names:
crawlers_to_run.update(alias_names)
log.explain_topic(f"Crawler alias {name!r} found corresponding crawler sections:")
for alias_name in alias_names:
log.explain(f"Crawler section named {alias_name!r} with alias {name!r} exists")
if not section_name in crawl_sections and not alias_names:
log.explain(f"There's neither a crawler section named {section_name!r} nor does a crawler with alias {name!r} exist.")
unknown_names.append(name) unknown_names.append(name)
if unknown_names: if unknown_names:
@ -73,14 +65,6 @@ class Pferd:
return crawlers_to_run return crawlers_to_run
def _find_crawlers_by_alias(self, alias: str, config: Config) -> Set[str]:
alias_names = set()
for (section_name, section) in config.crawl_sections():
section_aliases = section.get("aliases", [])
if alias in section_aliases:
alias_names.add(section_name)
return alias_names
def _find_crawlers_to_run( def _find_crawlers_to_run(
self, self,
config: Config, config: Config,

View File

@ -1,2 +1,2 @@
NAME = "PFERD" NAME = "PFERD"
VERSION = "3.5.0" VERSION = "3.5.2"

8
flake.lock generated
View File

@ -2,16 +2,16 @@
"nodes": { "nodes": {
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1694499547, "lastModified": 1708979614,
"narHash": "sha256-R7xMz1Iia6JthWRHDn36s/E248WB1/je62ovC/dUVKI=", "narHash": "sha256-FWLWmYojIg6TeqxSnHkKpHu5SGnFP5um1uUjH+wRV6g=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "e5f018cf150e29aac26c61dac0790ea023c46b24", "rev": "b7ee09cf5614b02d289cd86fcfa6f24d4e078c2a",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "nixos-23.05", "ref": "nixos-23.11",
"repo": "nixpkgs", "repo": "nixpkgs",
"type": "github" "type": "github"
} }

View File

@ -2,7 +2,7 @@
description = "Tool for downloading course-related files from ILIAS"; description = "Tool for downloading course-related files from ILIAS";
inputs = { inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05"; nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11";
}; };
outputs = { self, nixpkgs }: outputs = { self, nixpkgs }: