Compare commits

...

18 Commits

Author SHA1 Message Date
da627ff929 Bump version to 3.5.1 2024-04-09 14:28:56 +02:00
c1b592ac29 Fix ILIAS 8 file downloads truncating to zero bytes 2024-04-08 17:59:41 +02:00
eb0c956d32 Add compatibility with ILIAS 8 2024-04-05 19:08:05 +02:00
ab0cb2d956 nix: bump nixpgs dependency 2024-02-27 23:39:53 +01:00
a117126389 Fix video name deduplication 2023-12-09 23:08:42 +01:00
e9f8901520 Fix typos in ilias crawler and use set literals 2023-11-30 20:57:57 +01:00
266812f90e Move is_logged_in helper to kit_ilias_html 2023-11-16 11:19:20 +01:00
533bc27439 Bump version to 3.5.0 2023-09-13 23:13:30 +02:00
0113a0ca10 Update flake.lock 2023-09-13 22:23:36 +02:00
40f8a05ad6 Add .idea to gitignore 2023-09-13 22:23:36 +02:00
50b50513c6 Ignore SCORM learning modules 2023-08-29 13:51:19 +02:00
df3514cd03 Crawl paginated past meetings 2023-08-29 12:41:21 +02:00
ad53185247 Sanitize ascii control characters on windows 2023-08-29 12:41:15 +02:00
87b67e9271 Crawl files in the info tab 2023-08-29 12:41:15 +02:00
b54b3b979c Remove size suffix for content pages 2023-08-27 11:43:05 +02:00
2184ac8040 Add support for ILIAS mediacast listings 2023-08-27 11:43:05 +02:00
b3d412360b Add Nix flake 2023-08-26 23:54:19 +02:00
dbc2553b11 Add default show-not-deleted option
If set to `no`, PFERD won't print status or report messages for not deleted files
2023-08-26 18:43:01 +02:00
15 changed files with 471 additions and 188 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@
/PFERD.egg-info/
__pycache__/
/.vscode/
/.idea/
# pyinstaller
/pferd.spec

View File

@ -22,15 +22,36 @@ ambiguous situations.
## Unreleased
## 3.5.1 - 2024-04-09
### Added
- Support for ILIAS 8
### Fixed
- Video name deduplication
## 3.5.0 - 2023-09-13
### Added
- `no-delete-prompt-override` conflict resolution strategy
- Support for ILIAS learning modules
- `show_not_deleted` option to stop printing the "Not Deleted" status or report
message. This combines nicely with the `no-delete-prompt-override` strategy,
causing PFERD to mostly ignore local-only files.
- Support for mediacast video listings
- Crawling of files in info tab
### Changed
- Remove size suffix for files in content pages
### Fixed
- Crawling of courses with the timeline view as the default tab
- Crawling of file and custom opencast cards
- Crawling of button cards without descriptions
- Abort crawling when encountering an unexpected ilias root page redirect
### Added
- `no-delete-prompt-override` conflict resolution strategy
- support for ILIAS learning modules
- Sanitize ascii control characters on Windows
- Crawling of paginated past meetings
- Ignore SCORM learning modules
## 3.4.3 - 2022-11-29

View File

@ -26,6 +26,9 @@ default values for the other sections.
`Added ...`) while running a crawler. (Default: `yes`)
- `report`: Whether PFERD should print a report of added, changed and deleted
local files for all crawlers before exiting. (Default: `yes`)
- `show_not_deleted`: Whether PFERD should print messages in status and report
when a local-only file wasn't deleted. Combines nicely with the
`no-delete-prompt-override` conflict resolution strategy.
- `share_cookies`: Whether crawlers should share cookies where applicable. For
example, some crawlers share cookies if they crawl the same website using the
same account. (Default: `yes`)
@ -75,8 +78,9 @@ common to all crawlers:
using `prompt` and always choosing "yes".
- `no-delete`: Never delete local files, but overwrite local files if the
remote file is different.
- `no-delete-prompt-overwrite`: Never delete local files, but prompt to overwrite local files if the
remote file is different.
- `no-delete-prompt-overwrite`: Never delete local files, but prompt to
overwrite local files if the remote file is different. Combines nicely
with the `show_not_deleted` option.
- `transform`: Rules for renaming and excluding certain files and directories.
For more details, see [this section](#transformation-rules). (Default: empty)
- `tasks`: The maximum number of concurrent tasks (such as crawling or

View File

@ -47,6 +47,8 @@ def configure_logging_from_args(args: argparse.Namespace) -> None:
log.output_explain = args.explain
if args.status is not None:
log.output_status = args.status
if args.show_not_deleted is not None:
log.output_not_deleted = args.show_not_deleted
if args.report is not None:
log.output_report = args.report
@ -72,6 +74,8 @@ def configure_logging_from_config(args: argparse.Namespace, config: Config) -> N
log.output_status = config.default_section.status()
if args.report is None:
log.output_report = config.default_section.report()
if args.show_not_deleted is None:
log.output_not_deleted = config.default_section.show_not_deleted()
except ConfigOptionError as e:
log.error(str(e))
sys.exit(1)

View File

@ -215,6 +215,11 @@ PARSER.add_argument(
action=BooleanOptionalAction,
help="whether crawlers should share cookies where applicable"
)
PARSER.add_argument(
"--show-not-deleted",
action=BooleanOptionalAction,
help="print messages in status and report when PFERD did not delete a local only file"
)
def load_default_section(
@ -233,6 +238,8 @@ def load_default_section(
section["report"] = "yes" if args.report else "no"
if args.share_cookies is not None:
section["share_cookies"] = "yes" if args.share_cookies else "no"
if args.show_not_deleted is not None:
section["show_not_deleted"] = "yes" if args.show_not_deleted else "no"
SUBPARSERS = PARSER.add_subparsers(title="crawlers")

View File

@ -82,6 +82,9 @@ class DefaultSection(Section):
def report(self) -> bool:
return self.s.getboolean("report", fallback=True)
def show_not_deleted(self) -> bool:
return self.s.getboolean("show_not_deleted", fallback=True)
def share_cookies(self) -> bool:
return self.s.getboolean("share_cookies", fallback=True)

View File

@ -3,7 +3,7 @@ import re
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional, Union, cast
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup, Tag
@ -17,19 +17,23 @@ TargetType = Union[str, int]
class IliasElementType(Enum):
EXERCISE = "exercise"
EXERCISE_FILES = "exercise_files" # own submitted files
TEST = "test" # an online test. Will be ignored currently.
TEST = "test" # an online test. Will be ignored currently.
FILE = "file"
FOLDER = "folder"
FORUM = "forum"
LINK = "link"
INFO_TAB = "info_tab"
LEARNING_MODULE = "learning_module"
BOOKING = "booking"
MEETING = "meeting"
SURVEY = "survey"
VIDEO = "video"
VIDEO_PLAYER = "video_player"
VIDEO_FOLDER = "video_folder"
VIDEO_FOLDER_MAYBE_PAGINATED = "video_folder_maybe_paginated"
SCORM_LEARNING_MODULE = "scorm_learning_module"
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder"
MEDIACAST_VIDEO = "mediacast_video"
OPENCAST_VIDEO = "opencast_video"
OPENCAST_VIDEO_PLAYER = "opencast_video_player"
OPENCAST_VIDEO_FOLDER = "opencast_video_folder"
OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated"
@dataclass
@ -45,7 +49,8 @@ class IliasPageElement:
r"eid=(?P<id>[0-9a-z\-]+)",
r"file_(?P<id>\d+)",
r"ref_id=(?P<id>\d+)",
r"target=[a-z]+_(?P<id>\d+)"
r"target=[a-z]+_(?P<id>\d+)",
r"mm_(?P<id>\d+)"
]
for regex in regexes:
@ -90,13 +95,9 @@ class IliasPage:
@staticmethod
def is_root_page(soup: BeautifulSoup) -> bool:
permalink = soup.find(id="current_perma_link")
if permalink is None:
return False
value = permalink.attrs.get("value")
if value is None:
return False
return "goto.php?target=root_" in value
if permalink := IliasPage.get_soup_permalink(soup):
return "goto.php?target=root_" in permalink
return False
def get_child_elements(self) -> List[IliasPageElement]:
"""
@ -105,9 +106,9 @@ class IliasPage:
if self._is_video_player():
log.explain("Page is a video player, extracting URL")
return self._player_to_video()
if self._is_video_listing():
log.explain("Page is a video listing, searching for elements")
return self._find_video_entries()
if self._is_opencast_video_listing():
log.explain("Page is an opencast video listing, searching for elements")
return self._find_opencast_video_entries()
if self._is_exercise_file():
log.explain("Page is an exercise, searching for elements")
return self._find_exercise_entries()
@ -117,9 +118,25 @@ class IliasPage:
if self._is_content_page():
log.explain("Page is a content page, searching for elements")
return self._find_copa_entries()
if self._is_info_tab():
log.explain("Page is info tab, searching for elements")
return self._find_info_tab_entries()
log.explain("Page is a normal folder, searching for elements")
return self._find_normal_entries()
def get_info_tab(self) -> Optional[IliasPageElement]:
tab: Optional[Tag] = self._soup.find(
name="a",
attrs={"href": lambda x: x and "cmdClass=ilinfoscreengui" in x}
)
if tab is not None:
return IliasPageElement(
IliasElementType.INFO_TAB,
self._abs_url_from_link(tab),
"infos"
)
return None
def get_description(self) -> Optional[BeautifulSoup]:
def is_interesting_class(name: str) -> bool:
return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"]
@ -199,14 +216,18 @@ class IliasPage:
if self._is_ilias_opencast_embedding():
log.explain("Unwrapping opencast embedding")
return self.get_child_elements()[0]
if self._page_type == IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED:
if self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED:
log.explain("Unwrapping video pagination")
return self._find_video_entries_paginated()[0]
return self._find_opencast_video_entries_paginated()[0]
if self._contains_collapsed_future_meetings():
log.explain("Requesting *all* future meetings")
return self._uncollapse_future_meetings_url()
if not self._is_content_tab_selected():
return self._select_content_page_url()
if self._page_type != IliasElementType.INFO_TAB:
log.explain("Selecting content tab")
return self._select_content_page_url()
else:
log.explain("Crawling info tab, skipping content select")
return None
def _is_forum_page(self) -> bool:
@ -219,7 +240,7 @@ class IliasPage:
def _is_video_player(self) -> bool:
return "paella_config_file" in str(self._soup)
def _is_video_listing(self) -> bool:
def _is_opencast_video_listing(self) -> bool:
if self._is_ilias_opencast_embedding():
return True
@ -254,22 +275,23 @@ class IliasPage:
return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x})
def _is_content_page(self) -> bool:
link = self._soup.find(id="current_perma_link")
if not link:
return False
return "target=copa_" in link.get("value")
if link := self.get_permalink():
return "target=copa_" in link
return False
def _is_learning_module_page(self) -> bool:
link = self._soup.find(id="current_perma_link")
if not link:
return False
return "target=pg_" in link.get("value")
if link := self.get_permalink():
return "target=pg_" in link
return False
def _contains_collapsed_future_meetings(self) -> bool:
return self._uncollapse_future_meetings_url() is not None
def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]:
element = self._soup.find("a", attrs={"href": lambda x: x and "crs_next_sess=1" in x})
element = self._soup.find(
"a",
attrs={"href": lambda x: x and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)}
)
if not element:
return None
link = self._abs_url_from_link(element)
@ -278,6 +300,10 @@ class IliasPage:
def _is_content_tab_selected(self) -> bool:
return self._select_content_page_url() is None
def _is_info_tab(self) -> bool:
might_be_info = self._soup.find("form", attrs={"name": lambda x: x == "formInfoScreen"}) is not None
return self._page_type == IliasElementType.INFO_TAB and might_be_info
def _select_content_page_url(self) -> Optional[IliasPageElement]:
tab = self._soup.find(
id="tab_view_content",
@ -319,14 +345,14 @@ class IliasPage:
# and just fetch the lone video url!
if len(streams) == 1:
video_url = streams[0]["sources"]["mp4"][0]["src"]
return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)]
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, self._source_name)]
log.explain(f"Found multiple videos for stream at {self._source_name}")
items = []
for stream in sorted(streams, key=lambda stream: stream["content"]):
full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4"
video_url = stream["sources"]["mp4"][0]["src"]
items.append(IliasPageElement(IliasElementType.VIDEO, video_url, full_name))
items.append(IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, full_name))
return items
@ -374,7 +400,8 @@ class IliasPage:
for link in links:
url = self._abs_url_from_link(link)
name = _sanitize_path_name(link.getText().strip().replace("\t", ""))
name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.getText()).strip().replace("\t", "")
name = _sanitize_path_name(name)
if "file_id" not in url:
_unexpected_html_warning()
@ -385,7 +412,24 @@ class IliasPage:
return items
def _find_video_entries(self) -> List[IliasPageElement]:
def _find_info_tab_entries(self) -> List[IliasPageElement]:
items = []
links: List[Tag] = self._soup.select("a.il_ContainerItemCommand")
for link in links:
if "cmdClass=ilobjcoursegui" not in link["href"]:
continue
if "cmd=sendfile" not in link["href"]:
continue
items.append(IliasPageElement(
IliasElementType.FILE,
self._abs_url_from_link(link),
_sanitize_path_name(link.getText())
))
return items
def _find_opencast_video_entries(self) -> List[IliasPageElement]:
# ILIAS has three stages for video pages
# 1. The initial dummy page without any videos. This page contains the link to the listing
# 2. The video listing which might be paginated
@ -405,27 +449,27 @@ class IliasPage:
query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
url = url_set_query_params(url, query_params)
log.explain("Found ILIAS video frame page, fetching actual content next")
return [IliasPageElement(IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED, url, "")]
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, url, "")]
is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None
if is_paginated and not self._page_type == IliasElementType.VIDEO_FOLDER:
if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER:
# We are in stage 2 - try to break pagination
return self._find_video_entries_paginated()
return self._find_opencast_video_entries_paginated()
return self._find_video_entries_no_paging()
return self._find_opencast_video_entries_no_paging()
def _find_video_entries_paginated(self) -> List[IliasPageElement]:
def _find_opencast_video_entries_paginated(self) -> List[IliasPageElement]:
table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+"))
if table_element is None:
log.warn("Couldn't increase elements per page (table not found). I might miss elements.")
return self._find_video_entries_no_paging()
return self._find_opencast_video_entries_no_paging()
id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"])
if id_match is None:
log.warn("Couldn't increase elements per page (table id not found). I might miss elements.")
return self._find_video_entries_no_paging()
return self._find_opencast_video_entries_no_paging()
table_id = id_match.group(1)
@ -434,9 +478,9 @@ class IliasPage:
url = url_set_query_params(self._page_url, query_params)
log.explain("Disabled pagination, retrying folder as a new entry")
return [IliasPageElement(IliasElementType.VIDEO_FOLDER, url, "")]
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")]
def _find_video_entries_no_paging(self) -> List[IliasPageElement]:
def _find_opencast_video_entries_no_paging(self) -> List[IliasPageElement]:
"""
Crawls the "second stage" video page. This page contains the actual video urls.
"""
@ -448,11 +492,11 @@ class IliasPage:
results: List[IliasPageElement] = []
for link in video_links:
results.append(self._listed_video_to_element(link))
results.append(self._listed_opencast_video_to_element(link))
return results
def _listed_video_to_element(self, link: Tag) -> IliasPageElement:
def _listed_opencast_video_to_element(self, link: Tag) -> IliasPageElement:
# The link is part of a table with multiple columns, describing metadata.
# 6th or 7th child (1 indexed) is the modification time string. Try to find it
# by parsing backwards from the end and finding something that looks like a date
@ -463,8 +507,8 @@ class IliasPage:
modification_string = link.parent.parent.parent.select_one(
f"td.std:nth-child({index})"
).getText().strip()
if re.search(r"\d+\.\d+.\d+ - \d+:\d+", modification_string):
modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M")
if match := re.search(r"\d+\.\d+.\d+ \d+:\d+", modification_string):
modification_time = datetime.strptime(match.group(0), "%d.%m.%Y %H:%M")
break
if modification_time is None:
@ -479,7 +523,9 @@ class IliasPage:
video_url = self._abs_url_from_link(link)
log.explain(f"Found video {video_name!r} at {video_url}")
return IliasPageElement(IliasElementType.VIDEO_PLAYER, video_url, video_name, modification_time)
return IliasPageElement(
IliasElementType.OPENCAST_VIDEO_PLAYER, video_url, video_name, modification_time
)
def _find_exercise_entries(self) -> List[IliasPageElement]:
if self._soup.find(id="tab_submission"):
@ -561,7 +607,7 @@ class IliasPage:
file_listings: List[Tag] = container.findAll(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x and "cmdClass=ilexsubmissionfilegui" in x}
attrs={"href": lambda x: x and "cmdclass=ilexsubmissionfilegui" in x.lower()}
)
# Add each listing as a new
@ -622,9 +668,48 @@ class IliasPage:
result.append(IliasPageElement(element_type, abs_url, element_name, description=description))
result += self._find_cards()
result += self._find_mediacast_videos()
return result
def _find_mediacast_videos(self) -> List[IliasPageElement]:
videos: List[IliasPageElement] = []
for elem in cast(List[Tag], self._soup.select(".ilPlayerPreviewOverlayOuter")):
element_name = _sanitize_path_name(
elem.select_one(".ilPlayerPreviewDescription").getText().strip()
)
if not element_name.endswith(".mp4"):
# just to make sure it has some kinda-alrightish ending
element_name = element_name + ".mp4"
video_element = elem.find(name="video")
if not video_element:
_unexpected_html_warning()
log.warn_contd(f"No <video> element found for mediacast video '{element_name}'")
continue
videos.append(IliasPageElement(
type=IliasElementType.MEDIACAST_VIDEO,
url=self._abs_url_from_relative(video_element.get("src")),
name=element_name,
mtime=self._find_mediacast_video_mtime(elem.findParent(name="td"))
))
return videos
def _find_mediacast_video_mtime(self, enclosing_td: Tag) -> Optional[datetime]:
description_td: Tag = enclosing_td.findPreviousSibling("td")
if not description_td:
return None
meta_tag: Tag = description_td.find_all("p")[-1]
if not meta_tag:
return None
updated_str = meta_tag.getText().strip().replace("\n", " ")
updated_str = re.sub(".+?: ", "", updated_str)
return demangle_date(updated_str)
def _is_in_expanded_meeting(self, tag: Tag) -> bool:
"""
Returns whether a file is part of an expanded meeting.
@ -796,7 +881,7 @@ class IliasPage:
icon: Tag = card_root.select_one(".il-card-repository-head .icon")
if "opencast" in icon["class"] or "xoct" in icon["class"]:
return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
if "exc" in icon["class"]:
return IliasElementType.EXERCISE
if "webr" in icon["class"]:
@ -817,6 +902,8 @@ class IliasPage:
return IliasElementType.SURVEY
if "file" in icon["class"]:
return IliasElementType.FILE
if "mcst" in icon["class"]:
return IliasElementType.MEDIACAST_VIDEO_FOLDER
_unexpected_html_warning()
log.warn_contd(f"Could not extract type from {icon} for card title {card_title}")
@ -824,9 +911,9 @@ class IliasPage:
@staticmethod
def _find_type_from_link(
element_name: str,
link_element: Tag,
url: str
element_name: str,
link_element: Tag,
url: str
) -> Optional[IliasElementType]:
"""
Decides which sub crawler to use for a given top level element.
@ -858,6 +945,12 @@ class IliasPage:
if "baseClass=ilLMPresentationGUI" in parsed_url.query:
return IliasElementType.LEARNING_MODULE
if "baseClass=ilMediaCastHandlerGUI" in parsed_url.query:
return IliasElementType.MEDIACAST_VIDEO_FOLDER
if "baseClass=ilSAHSPresentationGUI" in parsed_url.query:
return IliasElementType.SCORM_LEARNING_MODULE
# Booking and Meeting can not be detected based on the link. They do have a ref_id though, so
# try to guess it from the image.
@ -899,7 +992,11 @@ class IliasPage:
if img_tag is None:
img_tag = found_parent.select_one("img.icon")
if img_tag is None and found_parent.find("a", attrs={"href": lambda x: x and "crs_next_sess=" in x}):
is_session_expansion_button = found_parent.find(
"a",
attrs={"href": lambda x: x and ("crs_next_sess=" in x or "crs_prev_sess=" in x)}
)
if img_tag is None and is_session_expansion_button:
log.explain("Found session expansion button, skipping it as it has no content")
return None
@ -909,7 +1006,7 @@ class IliasPage:
return None
if "opencast" in str(img_tag["alt"]).lower():
return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
if str(img_tag["src"]).endswith("icon_exc.svg"):
return IliasElementType.EXERCISE
@ -929,6 +1026,12 @@ class IliasPage:
if str(img_tag["src"]).endswith("icon_tst.svg"):
return IliasElementType.TEST
if str(img_tag["src"]).endswith("icon_mcst.svg"):
return IliasElementType.MEDIACAST_VIDEO_FOLDER
if str(img_tag["src"]).endswith("icon_sahs.svg"):
return IliasElementType.SCORM_LEARNING_MODULE
return IliasElementType.FOLDER
@staticmethod
@ -958,6 +1061,37 @@ class IliasPage:
rest_of_name = split_delimiter.join(meeting_name.split(split_delimiter)[1:])
return datetime.strftime(date_portion, "%Y-%m-%d") + split_delimiter + rest_of_name
@staticmethod
def is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False
def get_permalink(self) -> Optional[str]:
return IliasPage.get_soup_permalink(self._soup)
def _abs_url_from_link(self, link_tag: Tag) -> str:
"""
Create an absolute url from an <a> tag.
@ -970,6 +1104,13 @@ class IliasPage:
"""
return urljoin(self._page_url, relative_url)
@staticmethod
def get_soup_permalink(soup: BeautifulSoup) -> Optional[str]:
perma_link_element: Tag = soup.select_one(".il-footer-permanent-url > a")
if not perma_link_element or not perma_link_element.get("href"):
return None
return perma_link_element.get("href")
def _unexpected_html_warning() -> None:
log.warn("Encountered unexpected HTML structure, ignoring element.")
@ -993,7 +1134,7 @@ def demangle_date(date_str: str, fail_silently: bool = False) -> Optional[dateti
date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I)
date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I)
date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I)
date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I)
date_str = date_str.strip()
for german, english in zip(german_months, english_months):
date_str = date_str.replace(german, english)

View File

@ -81,21 +81,25 @@ class KitIliasWebCrawlerSection(HttpCrawlerSection):
return self.s.getboolean("forums", fallback=False)
_DIRECTORY_PAGES: Set[IliasElementType] = set([
_DIRECTORY_PAGES: Set[IliasElementType] = {
IliasElementType.EXERCISE,
IliasElementType.EXERCISE_FILES,
IliasElementType.FOLDER,
IliasElementType.INFO_TAB,
IliasElementType.MEETING,
IliasElementType.VIDEO_FOLDER,
IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED,
])
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
}
_VIDEO_ELEMENTS: Set[IliasElementType] = set([
IliasElementType.VIDEO,
IliasElementType.VIDEO_PLAYER,
IliasElementType.VIDEO_FOLDER,
IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED,
])
_VIDEO_ELEMENTS: Set[IliasElementType] = {
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.MEDIACAST_VIDEO,
IliasElementType.OPENCAST_VIDEO,
IliasElementType.OPENCAST_VIDEO_PLAYER,
IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
}
def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]:
@ -126,6 +130,7 @@ def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Calla
raise CrawlError("Impossible return in ilias _iorepeat")
return wrapper # type: ignore
return decorator
@ -136,6 +141,10 @@ def _wrap_io_in_warning(name: str) -> Callable[[AWrapped], AWrapped]:
return _iorepeat(1, name)
def _get_video_cache_key(element: IliasPageElement) -> str:
return f"ilias-video-cache-{element.id()}"
# Crawler control flow:
#
# crawl_desktop -+
@ -169,11 +178,11 @@ def _wrap_io_in_warning(name: str) -> Callable[[AWrapped], AWrapped]:
class KitIliasWebCrawler(HttpCrawler):
def __init__(
self,
name: str,
section: KitIliasWebCrawlerSection,
config: Config,
authenticators: Dict[str, Authenticator]
self,
name: str,
section: KitIliasWebCrawlerSection,
config: Config,
authenticators: Dict[str, Authenticator]
):
# Setting a main authenticator for cookie sharing
auth = section.auth(authenticators)
@ -245,8 +254,8 @@ instance's greatest bottleneck.
soup = await self._get_page(next_stage_url, root_page_allowed=True)
if current_parent is None and expected_id is not None:
perma_link_element: Tag = soup.find(id="current_perma_link")
if not perma_link_element or "crs_" not in perma_link_element.get("value"):
perma_link = IliasPage.get_soup_permalink(soup)
if not perma_link or "crs_" not in perma_link:
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
@ -259,6 +268,8 @@ instance's greatest bottleneck.
next_stage_url = None
elements.extend(page.get_child_elements())
if info_tab := page.get_info_tab():
elements.append(info_tab)
if description_string := page.get_description():
description.append(description_string)
@ -397,16 +408,26 @@ instance's greatest bottleneck.
"[bright_black](surveys contain no relevant data)"
)
return None
elif element.type == IliasElementType.SCORM_LEARNING_MODULE:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](scorm learning modules are not supported)"
)
return None
elif element.type == IliasElementType.LEARNING_MODULE:
return await self._handle_learning_module(element, element_path)
elif element.type == IliasElementType.LINK:
return await self._handle_link(element, element_path)
elif element.type == IliasElementType.BOOKING:
return await self._handle_booking(element, element_path)
elif element.type == IliasElementType.VIDEO:
elif element.type == IliasElementType.OPENCAST_VIDEO:
return await self._handle_file(element, element_path)
elif element.type == IliasElementType.OPENCAST_VIDEO_PLAYER:
return await self._handle_opencast_video(element, element_path)
elif element.type == IliasElementType.MEDIACAST_VIDEO:
return await self._handle_file(element, element_path)
elif element.type == IliasElementType.VIDEO_PLAYER:
return await self._handle_video(element, element_path)
elif element.type in _DIRECTORY_PAGES:
return await self._handle_ilias_page(element.url, element, element_path)
else:
@ -523,7 +544,7 @@ instance's greatest bottleneck.
raise CrawlError("resolve_link_target failed even after authenticating")
async def _handle_video(
async def _handle_opencast_video(
self,
element: IliasPageElement,
element_path: PurePath,
@ -531,8 +552,8 @@ instance's greatest bottleneck.
# Copy old mapping as it is likely still relevant
if self.prev_report:
self.report.add_custom_value(
str(element_path),
self.prev_report.get_custom_value(str(element_path))
_get_video_cache_key(element),
self.prev_report.get_custom_value(_get_video_cache_key(element))
)
# A video might contain other videos, so let's "crawl" the video first
@ -542,58 +563,69 @@ instance's greatest bottleneck.
# to ensure backwards compatibility.
maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.ALWAYS)
# If we do not want to crawl it (user filter) or we have every file
# from the cached mapping already, we can ignore this and bail
if not maybe_dl or self._all_videos_locally_present(element_path):
# Mark all existing cideos as known so they do not get deleted
# during dleanup. We "downloaded" them, just without actually making
# a network request as we assumed they did not change.
for video in self._previous_contained_videos(element_path):
await self.download(video)
# If we do not want to crawl it (user filter), we can move on
if not maybe_dl:
return None
# If we have every file from the cached mapping already, we can ignore this and bail
if self._all_opencast_videos_locally_present(element, maybe_dl.path):
# Mark all existing videos as known to ensure they do not get deleted during cleanup.
# We "downloaded" them, just without actually making a network request as we assumed
# they did not change.
contained = self._previous_contained_opencast_videos(element, maybe_dl.path)
if len(contained) > 1:
# Only do this if we threw away the original dl token,
# to not download single-stream videos twice
for video in contained:
await self.download(video)
return None
return self._download_video(element_path, element, maybe_dl)
return self._download_opencast_video(element, maybe_dl)
def _previous_contained_videos(self, video_path: PurePath) -> List[PurePath]:
def _previous_contained_opencast_videos(
self, element: IliasPageElement, element_path: PurePath
) -> List[PurePath]:
if not self.prev_report:
return []
custom_value = self.prev_report.get_custom_value(str(video_path))
custom_value = self.prev_report.get_custom_value(_get_video_cache_key(element))
if not custom_value:
return []
names = cast(List[str], custom_value)
folder = video_path.parent
return [PurePath(folder, name) for name in names]
cached_value = cast(dict[str, Any], custom_value)
if "known_paths" not in cached_value or "own_path" not in cached_value:
log.explain(f"'known_paths' or 'own_path' missing from cached value: {cached_value}")
return []
transformed_own_path = self._transformer.transform(element_path)
if cached_value["own_path"] != str(transformed_own_path):
log.explain(
f"own_path '{transformed_own_path}' does not match cached value: '{cached_value['own_path']}"
)
return []
return [PurePath(name) for name in cached_value["known_paths"]]
def _all_videos_locally_present(self, video_path: PurePath) -> bool:
if contained_videos := self._previous_contained_videos(video_path):
log.explain_topic(f"Checking local cache for video {video_path.name}")
all_found_locally = True
for video in contained_videos:
transformed_path = self._to_local_video_path(video)
if transformed_path:
exists_locally = self._output_dir.resolve(transformed_path).exists()
all_found_locally = all_found_locally and exists_locally
if all_found_locally:
log.explain("Found all videos locally, skipping enumeration request")
def _all_opencast_videos_locally_present(self, element: IliasPageElement, element_path: PurePath) -> bool:
log.explain_topic(f"Checking local cache for video {fmt_path(element_path)}")
if contained_videos := self._previous_contained_opencast_videos(element, element_path):
log.explain(
f"The following contained videos are known: {','.join(map(fmt_path, contained_videos))}"
)
if all(self._output_dir.resolve(path).exists() for path in contained_videos):
log.explain("Found all known videos locally, skipping enumeration request")
return True
log.explain("Missing at least one video, continuing with requests!")
else:
log.explain("No local cache present")
return False
def _to_local_video_path(self, path: PurePath) -> Optional[PurePath]:
if transformed := self._transformer.transform(path):
return self._deduplicator.fixup_path(transformed)
return None
@anoncritical
@_iorepeat(3, "downloading video")
async def _download_video(
self,
original_path: PurePath,
element: IliasPageElement,
dl: DownloadToken
) -> None:
stream_elements: List[IliasPageElement] = []
async def _download_opencast_video(self, element: IliasPageElement, dl: DownloadToken) -> None:
def add_to_report(paths: list[str]) -> None:
self.report.add_custom_value(
_get_video_cache_key(element),
{"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))}
)
async with dl as (bar, sink):
page = IliasPage(await self._get_page(element.url), element.url, element)
stream_elements = page.get_child_elements()
@ -604,32 +636,25 @@ instance's greatest bottleneck.
log.explain(f"Using single video mode for {element.name}")
stream_element = stream_elements[0]
transformed_path = self._to_local_video_path(original_path)
if not transformed_path:
raise CrawlError(f"Download returned a path but transform did not for {original_path}")
# We do not have a local cache yet
if self._output_dir.resolve(transformed_path).exists():
log.explain(f"Video for {element.name} existed locally")
else:
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
self.report.add_custom_value(str(original_path), [original_path.name])
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
add_to_report([str(self._transformer.transform(dl.path))])
return
contained_video_paths: List[str] = []
for stream_element in stream_elements:
video_path = original_path.parent / stream_element.name
contained_video_paths.append(str(video_path))
video_path = dl.path.parent / stream_element.name
maybe_dl = await self.download(video_path, mtime=element.mtime, redownload=Redownload.NEVER)
if not maybe_dl:
continue
async with maybe_dl as (bar, sink):
log.explain(f"Streaming video from real url {stream_element.url}")
contained_video_paths.append(str(self._transformer.transform(maybe_dl.path)))
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
self.report.add_custom_value(str(original_path), contained_video_paths)
add_to_report(contained_video_paths)
async def _handle_file(
self,
@ -641,8 +666,8 @@ instance's greatest bottleneck.
return None
return self._download_file(element, maybe_dl)
@anoncritical
@_iorepeat(3, "downloading file")
@anoncritical
async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None:
assert dl # The function is only reached when dl is not None
async with dl as (bar, sink):
@ -650,12 +675,28 @@ instance's greatest bottleneck.
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None:
async def try_stream() -> bool:
async with self.session.get(url, allow_redirects=is_video) as resp:
if not is_video:
# Redirect means we weren't authenticated
next_url = url
# Normal files redirect to the magazine if we are not authenticated. As files could be HTML,
# we can not match on the content type here. Instead, we disallow redirects and inspect the
# new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume
# our authentication expired.
if not is_video:
async with self.session.get(url, allow_redirects=False) as resp:
# Redirect to anything except a "sendfile" means we weren't authenticated
if hdrs.LOCATION in resp.headers:
return False
# we wanted a video but got HTML
if "&cmd=sendfile" not in resp.headers[hdrs.LOCATION]:
return False
# Directly follow the redirect to not make a second, unnecessary request
next_url = resp.headers[hdrs.LOCATION]
# Let's try this again and follow redirects
return await fetch_follow_redirects(next_url)
async def fetch_follow_redirects(file_url: str) -> bool:
async with self.session.get(file_url) as resp:
# We wanted a video but got HTML => Forbidden, auth expired. Logging in won't really
# solve that depending on the setup, but it is better than nothing.
if is_video and "html" in resp.content_type:
return False
@ -700,7 +741,7 @@ instance's greatest bottleneck.
log.explain(f"URL: {next_stage_url}")
soup = await self._get_page(next_stage_url)
page = IliasPage(soup, next_stage_url, None)
page = IliasPage(soup, next_stage_url, element)
if next := page.get_next_stage_element():
next_stage_url = next.url
@ -712,7 +753,6 @@ instance's greatest bottleneck.
raise CrawlWarning("Failed to extract forum data")
if download_data.empty:
log.explain("Forum had no threads")
elements = []
return
html = await self._post_authenticated(download_data.url, download_data.form_data)
elements = parse_ilias_forum_export(soupify(html))
@ -763,14 +803,14 @@ instance's greatest bottleneck.
log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {element.url}")
soup = await self._get_page(element.url)
page = IliasPage(soup, element.url, None)
page = IliasPage(soup, element.url, element)
if next := page.get_learning_module_data():
elements.extend(await self._crawl_learning_module_direction(
cl.path, next.previous_url, "left"
cl.path, next.previous_url, "left", element
))
elements.append(next)
elements.extend(await self._crawl_learning_module_direction(
cl.path, next.next_url, "right"
cl.path, next.next_url, "right", element
))
# Reflect their natural ordering in the file names
@ -792,7 +832,8 @@ instance's greatest bottleneck.
self,
path: PurePath,
start_url: Optional[str],
dir: Union[Literal["left"], Literal["right"]]
dir: Union[Literal["left"], Literal["right"]],
parent_element: IliasPageElement
) -> List[IliasLearningModulePage]:
elements: List[IliasLearningModulePage] = []
@ -805,7 +846,7 @@ instance's greatest bottleneck.
log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})")
log.explain(f"URL: {next_element_url}")
soup = await self._get_page(next_element_url)
page = IliasPage(soup, next_element_url, None)
page = IliasPage(soup, next_element_url, parent_element)
if next := page.get_learning_module_data():
elements.append(next)
if dir == "left":
@ -877,7 +918,7 @@ instance's greatest bottleneck.
auth_id = await self._current_auth_id()
async with self.session.get(url) as request:
soup = soupify(await request.read())
if self._is_logged_in(soup):
if IliasPage.is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed)
# We weren't authenticated, so try to do that
@ -886,11 +927,12 @@ instance's greatest bottleneck.
# Retry once after authenticating. If this fails, we will die.
async with self.session.get(url) as request:
soup = soupify(await request.read())
if self._is_logged_in(soup):
if IliasPage.is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed)
raise CrawlError("get_page failed even after authenticating")
raise CrawlError(f"get_page failed even after authenticating on {url!r}")
def _verify_page(self, soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup:
@staticmethod
def _verify_page(soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup:
if IliasPage.is_root_page(soup) and not root_page_allowed:
raise CrawlError(
"Unexpectedly encountered ILIAS root page. "
@ -944,38 +986,10 @@ instance's greatest bottleneck.
# We repeat this as the login method in shibboleth doesn't handle I/O errors.
# Shibboleth is quite reliable as well, the repeat is likely not critical here.
@ _iorepeat(3, "Login", failure_is_error=True)
@_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session)
@ staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False
class KitShibbolethLogin:
"""
@ -1055,9 +1069,9 @@ class KitShibbolethLogin:
await sess.post(url, data=data)
async def _authenticate_tfa(
self,
session: aiohttp.ClientSession,
soup: BeautifulSoup
self,
session: aiohttp.ClientSession,
soup: BeautifulSoup
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
@ -1122,7 +1136,7 @@ async def _shib_post(
async with session.get(correct_url, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Redirected to {location!r} with status {response.status}")
# If shib still still has a valid session, it will directly respond to the request
# If shib still has a valid session, it will directly respond to the request
if location is None:
log.explain("Shib recognized us, returning its response directly")
return soupify(await response.read())

View File

@ -14,7 +14,7 @@ def name_variants(path: PurePath) -> Iterator[PurePath]:
class Deduplicator:
FORBIDDEN_CHARS = '<>:"/\\|?*'
FORBIDDEN_CHARS = '<>:"/\\|?*' + "".join([chr(i) for i in range(0, 32)])
FORBIDDEN_NAMES = {
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",

View File

@ -59,6 +59,7 @@ class Log:
# Whether different parts of the output are enabled or disabled
self.output_explain = False
self.output_status = True
self.output_not_deleted = True
self.output_report = True
def _update_live(self) -> None:
@ -207,6 +208,17 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
action = escape(f"{action:<{self.STATUS_WIDTH}}")
self.print(f"{style}{action}[/] {escape(text)} {suffix}")
def not_deleted(self, style: str, action: str, text: str, suffix: str = "") -> None:
"""
Print a message for a local only file that wasn't
deleted while crawling. Allows markup in the "style"
argument which will be applied to the "action" string.
"""
if self.output_status and self.output_not_deleted:
action = escape(f"{action:<{self.STATUS_WIDTH}}")
self.print(f"{style}{action}[/] {escape(text)} {suffix}")
def report(self, text: str) -> None:
"""
Print a report after crawling. Allows markup.
@ -215,6 +227,14 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
if self.output_report:
self.print(text)
def report_not_deleted(self, text: str) -> None:
"""
Print a report for a local only file that wasn't deleted after crawling. Allows markup.
"""
if self.output_report and self.output_not_deleted:
self.print(text)
@contextmanager
def _bar(
self,

View File

@ -496,7 +496,7 @@ class OutputDirectory:
except OSError:
pass
else:
log.status("[bold bright_magenta]", "Not deleted", fmt_path(pure))
log.not_deleted("[bold bright_magenta]", "Not deleted", fmt_path(pure))
self._report.not_delete_file(pure)
def load_prev_report(self) -> None:

View File

@ -180,7 +180,7 @@ class Pferd:
log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}")
for path in sorted(crawler.report.not_deleted_files):
something_changed = True
log.report(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
for warning in crawler.report.encountered_warnings:
something_changed = True

View File

@ -1,2 +1,2 @@
NAME = "PFERD"
VERSION = "3.4.3"
VERSION = "3.5.1"

27
flake.lock generated Normal file
View File

@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1708979614,
"narHash": "sha256-FWLWmYojIg6TeqxSnHkKpHu5SGnFP5um1uUjH+wRV6g=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "b7ee09cf5614b02d289cd86fcfa6f24d4e078c2a",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.11",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

41
flake.nix Normal file
View File

@ -0,0 +1,41 @@
{
description = "Tool for downloading course-related files from ILIAS";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11";
};
outputs = { self, nixpkgs }:
let
# Helper function to generate an attrset '{ x86_64-linux = f "x86_64-linux"; ... }'.
forAllSystems = nixpkgs.lib.genAttrs nixpkgs.lib.systems.flakeExposed;
in
{
packages = forAllSystems (system:
let pkgs = import nixpkgs { inherit system; };
in
rec {
default = pkgs.python3Packages.buildPythonApplication rec {
pname = "pferd";
# Performing black magic
# Don't worry, I sacrificed enough goats for the next few years
version = (pkgs.lib.importTOML ./PFERD/version.py).VERSION;
format = "pyproject";
src = ./.;
nativeBuildInputs = with pkgs.python3Packages; [
setuptools
];
propagatedBuildInputs = with pkgs.python3Packages; [
aiohttp
beautifulsoup4
rich
keyring
certifi
];
};
});
};
}