Compare commits

..

4 Commits

Author SHA1 Message Date
29251fa003 Merge 77c1f1516c into e41a22149e 2023-08-26 17:16:00 +02:00
e41a22149e Add default show-not-deleted option
If set to `no`, PFERD won't print status or report messages for not deleted files
2023-08-26 17:13:45 +02:00
77c1f1516c Used proper plural 2021-11-02 12:41:40 +01:00
9e12e96d90 Added alias functionality 2021-11-02 03:42:08 +01:00
11 changed files with 174 additions and 371 deletions

1
.gitignore vendored
View File

@ -3,7 +3,6 @@
/PFERD.egg-info/ /PFERD.egg-info/
__pycache__/ __pycache__/
/.vscode/ /.vscode/
/.idea/
# pyinstaller # pyinstaller
/pferd.spec /pferd.spec

View File

@ -22,31 +22,18 @@ ambiguous situations.
## Unreleased ## Unreleased
### Fixed
- Video name deduplication
## 3.5.0 - 2023-09-13
### Added
- `no-delete-prompt-override` conflict resolution strategy
- Support for ILIAS learning modules
- `show_not_deleted` option to stop printing the "Not Deleted" status or report
message. This combines nicely with the `no-delete-prompt-override` strategy,
causing PFERD to mostly ignore local-only files.
- Support for mediacast video listings
- Crawling of files in info tab
### Changed
- Remove size suffix for files in content pages
### Fixed ### Fixed
- Crawling of courses with the timeline view as the default tab - Crawling of courses with the timeline view as the default tab
- Crawling of file and custom opencast cards - Crawling of file and custom opencast cards
- Crawling of button cards without descriptions - Crawling of button cards without descriptions
- Abort crawling when encountering an unexpected ilias root page redirect - Abort crawling when encountering an unexpected ilias root page redirect
- Sanitize ascii control characters on Windows
- Crawling of paginated past meetings ### Added
- Ignore SCORM learning modules - `no-delete-prompt-override` conflict resolution strategy
- support for ILIAS learning modules
- `show_not_deleted` option to stop printing the "Not Deleted" status or report
message. This combines nicely with the `no-delete-prompt-override` strategy,
causing PFERD to mostly ignore local-only files.
## 3.4.3 - 2022-11-29 ## 3.4.3 - 2022-11-29

View File

@ -92,6 +92,9 @@ common to all crawlers:
load for the crawl target. (Default: `0.0`) load for the crawl target. (Default: `0.0`)
- `windows_paths`: Whether PFERD should find alternative names for paths that - `windows_paths`: Whether PFERD should find alternative names for paths that
are invalid on Windows. (Default: `yes` on Windows, `no` otherwise) are invalid on Windows. (Default: `yes` on Windows, `no` otherwise)
- `aliases`: List of strings that are considered as an alias when invoking with
the `--crawler` or `-C` flag. If there is more than one crawl section with
the same aliases all are selected. Thereby, you can group different crawlers.
Some crawlers may also require credentials for authentication. To configure how Some crawlers may also require credentials for authentication. To configure how
the crawler obtains its credentials, the `auth` option is used. It is set to the the crawler obtains its credentials, the `auth` option is used. It is set to the
@ -106,6 +109,7 @@ username = foo
password = bar password = bar
[crawl:something] [crawl:something]
aliases = [sth, some]
type = some-complex-crawler type = some-complex-crawler
auth = auth:example auth = auth:example
on_conflict = no-delete on_conflict = no-delete

View File

@ -241,5 +241,4 @@ def load_default_section(
if args.show_not_deleted is not None: if args.show_not_deleted is not None:
section["show_not_deleted"] = "yes" if args.show_not_deleted else "no" section["show_not_deleted"] = "yes" if args.show_not_deleted else "no"
SUBPARSERS = PARSER.add_subparsers(title="crawlers") SUBPARSERS = PARSER.add_subparsers(title="crawlers")

View File

@ -3,7 +3,7 @@ import re
from dataclasses import dataclass from dataclasses import dataclass
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from enum import Enum from enum import Enum
from typing import Dict, List, Optional, Union, cast from typing import Dict, List, Optional, Union
from urllib.parse import urljoin, urlparse from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
@ -22,18 +22,14 @@ class IliasElementType(Enum):
FOLDER = "folder" FOLDER = "folder"
FORUM = "forum" FORUM = "forum"
LINK = "link" LINK = "link"
INFO_TAB = "info_tab"
LEARNING_MODULE = "learning_module" LEARNING_MODULE = "learning_module"
BOOKING = "booking" BOOKING = "booking"
MEETING = "meeting" MEETING = "meeting"
SURVEY = "survey" SURVEY = "survey"
SCORM_LEARNING_MODULE = "scorm_learning_module" VIDEO = "video"
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder" VIDEO_PLAYER = "video_player"
MEDIACAST_VIDEO = "mediacast_video" VIDEO_FOLDER = "video_folder"
OPENCAST_VIDEO = "opencast_video" VIDEO_FOLDER_MAYBE_PAGINATED = "video_folder_maybe_paginated"
OPENCAST_VIDEO_PLAYER = "opencast_video_player"
OPENCAST_VIDEO_FOLDER = "opencast_video_folder"
OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated"
@dataclass @dataclass
@ -49,8 +45,7 @@ class IliasPageElement:
r"eid=(?P<id>[0-9a-z\-]+)", r"eid=(?P<id>[0-9a-z\-]+)",
r"file_(?P<id>\d+)", r"file_(?P<id>\d+)",
r"ref_id=(?P<id>\d+)", r"ref_id=(?P<id>\d+)",
r"target=[a-z]+_(?P<id>\d+)", r"target=[a-z]+_(?P<id>\d+)"
r"mm_(?P<id>\d+)"
] ]
for regex in regexes: for regex in regexes:
@ -110,9 +105,9 @@ class IliasPage:
if self._is_video_player(): if self._is_video_player():
log.explain("Page is a video player, extracting URL") log.explain("Page is a video player, extracting URL")
return self._player_to_video() return self._player_to_video()
if self._is_opencast_video_listing(): if self._is_video_listing():
log.explain("Page is an opencast video listing, searching for elements") log.explain("Page is a video listing, searching for elements")
return self._find_opencast_video_entries() return self._find_video_entries()
if self._is_exercise_file(): if self._is_exercise_file():
log.explain("Page is an exercise, searching for elements") log.explain("Page is an exercise, searching for elements")
return self._find_exercise_entries() return self._find_exercise_entries()
@ -122,25 +117,9 @@ class IliasPage:
if self._is_content_page(): if self._is_content_page():
log.explain("Page is a content page, searching for elements") log.explain("Page is a content page, searching for elements")
return self._find_copa_entries() return self._find_copa_entries()
if self._is_info_tab():
log.explain("Page is info tab, searching for elements")
return self._find_info_tab_entries()
log.explain("Page is a normal folder, searching for elements") log.explain("Page is a normal folder, searching for elements")
return self._find_normal_entries() return self._find_normal_entries()
def get_info_tab(self) -> Optional[IliasPageElement]:
tab: Optional[Tag] = self._soup.find(
name="a",
attrs={"href": lambda x: x and "cmdClass=ilinfoscreengui" in x}
)
if tab is not None:
return IliasPageElement(
IliasElementType.INFO_TAB,
self._abs_url_from_link(tab),
"infos"
)
return None
def get_description(self) -> Optional[BeautifulSoup]: def get_description(self) -> Optional[BeautifulSoup]:
def is_interesting_class(name: str) -> bool: def is_interesting_class(name: str) -> bool:
return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"] return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"]
@ -220,18 +199,14 @@ class IliasPage:
if self._is_ilias_opencast_embedding(): if self._is_ilias_opencast_embedding():
log.explain("Unwrapping opencast embedding") log.explain("Unwrapping opencast embedding")
return self.get_child_elements()[0] return self.get_child_elements()[0]
if self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED: if self._page_type == IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED:
log.explain("Unwrapping video pagination") log.explain("Unwrapping video pagination")
return self._find_opencast_video_entries_paginated()[0] return self._find_video_entries_paginated()[0]
if self._contains_collapsed_future_meetings(): if self._contains_collapsed_future_meetings():
log.explain("Requesting *all* future meetings") log.explain("Requesting *all* future meetings")
return self._uncollapse_future_meetings_url() return self._uncollapse_future_meetings_url()
if not self._is_content_tab_selected(): if not self._is_content_tab_selected():
if self._page_type != IliasElementType.INFO_TAB:
log.explain("Selecting content tab")
return self._select_content_page_url() return self._select_content_page_url()
else:
log.explain("Crawling info tab, skipping content select")
return None return None
def _is_forum_page(self) -> bool: def _is_forum_page(self) -> bool:
@ -244,7 +219,7 @@ class IliasPage:
def _is_video_player(self) -> bool: def _is_video_player(self) -> bool:
return "paella_config_file" in str(self._soup) return "paella_config_file" in str(self._soup)
def _is_opencast_video_listing(self) -> bool: def _is_video_listing(self) -> bool:
if self._is_ilias_opencast_embedding(): if self._is_ilias_opencast_embedding():
return True return True
@ -294,10 +269,7 @@ class IliasPage:
return self._uncollapse_future_meetings_url() is not None return self._uncollapse_future_meetings_url() is not None
def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]: def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]:
element = self._soup.find( element = self._soup.find("a", attrs={"href": lambda x: x and "crs_next_sess=1" in x})
"a",
attrs={"href": lambda x: x and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)}
)
if not element: if not element:
return None return None
link = self._abs_url_from_link(element) link = self._abs_url_from_link(element)
@ -306,10 +278,6 @@ class IliasPage:
def _is_content_tab_selected(self) -> bool: def _is_content_tab_selected(self) -> bool:
return self._select_content_page_url() is None return self._select_content_page_url() is None
def _is_info_tab(self) -> bool:
might_be_info = self._soup.find("form", attrs={"name": lambda x: x == "formInfoScreen"}) is not None
return self._page_type == IliasElementType.INFO_TAB and might_be_info
def _select_content_page_url(self) -> Optional[IliasPageElement]: def _select_content_page_url(self) -> Optional[IliasPageElement]:
tab = self._soup.find( tab = self._soup.find(
id="tab_view_content", id="tab_view_content",
@ -351,14 +319,14 @@ class IliasPage:
# and just fetch the lone video url! # and just fetch the lone video url!
if len(streams) == 1: if len(streams) == 1:
video_url = streams[0]["sources"]["mp4"][0]["src"] video_url = streams[0]["sources"]["mp4"][0]["src"]
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, self._source_name)] return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)]
log.explain(f"Found multiple videos for stream at {self._source_name}") log.explain(f"Found multiple videos for stream at {self._source_name}")
items = [] items = []
for stream in sorted(streams, key=lambda stream: stream["content"]): for stream in sorted(streams, key=lambda stream: stream["content"]):
full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4" full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4"
video_url = stream["sources"]["mp4"][0]["src"] video_url = stream["sources"]["mp4"][0]["src"]
items.append(IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, full_name)) items.append(IliasPageElement(IliasElementType.VIDEO, video_url, full_name))
return items return items
@ -406,8 +374,7 @@ class IliasPage:
for link in links: for link in links:
url = self._abs_url_from_link(link) url = self._abs_url_from_link(link)
name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.getText()).strip().replace("\t", "") name = _sanitize_path_name(link.getText().strip().replace("\t", ""))
name = _sanitize_path_name(name)
if "file_id" not in url: if "file_id" not in url:
_unexpected_html_warning() _unexpected_html_warning()
@ -418,24 +385,7 @@ class IliasPage:
return items return items
def _find_info_tab_entries(self) -> List[IliasPageElement]: def _find_video_entries(self) -> List[IliasPageElement]:
items = []
links: List[Tag] = self._soup.select("a.il_ContainerItemCommand")
for link in links:
if "cmdClass=ilobjcoursegui" not in link["href"]:
continue
if "cmd=sendfile" not in link["href"]:
continue
items.append(IliasPageElement(
IliasElementType.FILE,
self._abs_url_from_link(link),
_sanitize_path_name(link.getText())
))
return items
def _find_opencast_video_entries(self) -> List[IliasPageElement]:
# ILIAS has three stages for video pages # ILIAS has three stages for video pages
# 1. The initial dummy page without any videos. This page contains the link to the listing # 1. The initial dummy page without any videos. This page contains the link to the listing
# 2. The video listing which might be paginated # 2. The video listing which might be paginated
@ -455,27 +405,27 @@ class IliasPage:
query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
url = url_set_query_params(url, query_params) url = url_set_query_params(url, query_params)
log.explain("Found ILIAS video frame page, fetching actual content next") log.explain("Found ILIAS video frame page, fetching actual content next")
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, url, "")] return [IliasPageElement(IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED, url, "")]
is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None
if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER: if is_paginated and not self._page_type == IliasElementType.VIDEO_FOLDER:
# We are in stage 2 - try to break pagination # We are in stage 2 - try to break pagination
return self._find_opencast_video_entries_paginated() return self._find_video_entries_paginated()
return self._find_opencast_video_entries_no_paging() return self._find_video_entries_no_paging()
def _find_opencast_video_entries_paginated(self) -> List[IliasPageElement]: def _find_video_entries_paginated(self) -> List[IliasPageElement]:
table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+"))
if table_element is None: if table_element is None:
log.warn("Couldn't increase elements per page (table not found). I might miss elements.") log.warn("Couldn't increase elements per page (table not found). I might miss elements.")
return self._find_opencast_video_entries_no_paging() return self._find_video_entries_no_paging()
id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"]) id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"])
if id_match is None: if id_match is None:
log.warn("Couldn't increase elements per page (table id not found). I might miss elements.") log.warn("Couldn't increase elements per page (table id not found). I might miss elements.")
return self._find_opencast_video_entries_no_paging() return self._find_video_entries_no_paging()
table_id = id_match.group(1) table_id = id_match.group(1)
@ -484,9 +434,9 @@ class IliasPage:
url = url_set_query_params(self._page_url, query_params) url = url_set_query_params(self._page_url, query_params)
log.explain("Disabled pagination, retrying folder as a new entry") log.explain("Disabled pagination, retrying folder as a new entry")
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")] return [IliasPageElement(IliasElementType.VIDEO_FOLDER, url, "")]
def _find_opencast_video_entries_no_paging(self) -> List[IliasPageElement]: def _find_video_entries_no_paging(self) -> List[IliasPageElement]:
""" """
Crawls the "second stage" video page. This page contains the actual video urls. Crawls the "second stage" video page. This page contains the actual video urls.
""" """
@ -498,11 +448,11 @@ class IliasPage:
results: List[IliasPageElement] = [] results: List[IliasPageElement] = []
for link in video_links: for link in video_links:
results.append(self._listed_opencast_video_to_element(link)) results.append(self._listed_video_to_element(link))
return results return results
def _listed_opencast_video_to_element(self, link: Tag) -> IliasPageElement: def _listed_video_to_element(self, link: Tag) -> IliasPageElement:
# The link is part of a table with multiple columns, describing metadata. # The link is part of a table with multiple columns, describing metadata.
# 6th or 7th child (1 indexed) is the modification time string. Try to find it # 6th or 7th child (1 indexed) is the modification time string. Try to find it
# by parsing backwards from the end and finding something that looks like a date # by parsing backwards from the end and finding something that looks like a date
@ -529,9 +479,7 @@ class IliasPage:
video_url = self._abs_url_from_link(link) video_url = self._abs_url_from_link(link)
log.explain(f"Found video {video_name!r} at {video_url}") log.explain(f"Found video {video_name!r} at {video_url}")
return IliasPageElement( return IliasPageElement(IliasElementType.VIDEO_PLAYER, video_url, video_name, modification_time)
IliasElementType.OPENCAST_VIDEO_PLAYER, video_url, video_name, modification_time
)
def _find_exercise_entries(self) -> List[IliasPageElement]: def _find_exercise_entries(self) -> List[IliasPageElement]:
if self._soup.find(id="tab_submission"): if self._soup.find(id="tab_submission"):
@ -674,48 +622,9 @@ class IliasPage:
result.append(IliasPageElement(element_type, abs_url, element_name, description=description)) result.append(IliasPageElement(element_type, abs_url, element_name, description=description))
result += self._find_cards() result += self._find_cards()
result += self._find_mediacast_videos()
return result return result
def _find_mediacast_videos(self) -> List[IliasPageElement]:
videos: List[IliasPageElement] = []
for elem in cast(List[Tag], self._soup.select(".ilPlayerPreviewOverlayOuter")):
element_name = _sanitize_path_name(
elem.select_one(".ilPlayerPreviewDescription").getText().strip()
)
if not element_name.endswith(".mp4"):
# just to make sure it has some kinda-alrightish ending
element_name = element_name + ".mp4"
video_element = elem.find(name="video")
if not video_element:
_unexpected_html_warning()
log.warn_contd(f"No <video> element found for mediacast video '{element_name}'")
continue
videos.append(IliasPageElement(
type=IliasElementType.MEDIACAST_VIDEO,
url=self._abs_url_from_relative(video_element.get("src")),
name=element_name,
mtime=self._find_mediacast_video_mtime(elem.findParent(name="td"))
))
return videos
def _find_mediacast_video_mtime(self, enclosing_td: Tag) -> Optional[datetime]:
description_td: Tag = enclosing_td.findPreviousSibling("td")
if not description_td:
return None
meta_tag: Tag = description_td.find_all("p")[-1]
if not meta_tag:
return None
updated_str = meta_tag.getText().strip().replace("\n", " ")
updated_str = re.sub(".+?: ", "", updated_str)
return demangle_date(updated_str)
def _is_in_expanded_meeting(self, tag: Tag) -> bool: def _is_in_expanded_meeting(self, tag: Tag) -> bool:
""" """
Returns whether a file is part of an expanded meeting. Returns whether a file is part of an expanded meeting.
@ -887,7 +796,7 @@ class IliasPage:
icon: Tag = card_root.select_one(".il-card-repository-head .icon") icon: Tag = card_root.select_one(".il-card-repository-head .icon")
if "opencast" in icon["class"] or "xoct" in icon["class"]: if "opencast" in icon["class"] or "xoct" in icon["class"]:
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED
if "exc" in icon["class"]: if "exc" in icon["class"]:
return IliasElementType.EXERCISE return IliasElementType.EXERCISE
if "webr" in icon["class"]: if "webr" in icon["class"]:
@ -908,8 +817,6 @@ class IliasPage:
return IliasElementType.SURVEY return IliasElementType.SURVEY
if "file" in icon["class"]: if "file" in icon["class"]:
return IliasElementType.FILE return IliasElementType.FILE
if "mcst" in icon["class"]:
return IliasElementType.MEDIACAST_VIDEO_FOLDER
_unexpected_html_warning() _unexpected_html_warning()
log.warn_contd(f"Could not extract type from {icon} for card title {card_title}") log.warn_contd(f"Could not extract type from {icon} for card title {card_title}")
@ -951,12 +858,6 @@ class IliasPage:
if "baseClass=ilLMPresentationGUI" in parsed_url.query: if "baseClass=ilLMPresentationGUI" in parsed_url.query:
return IliasElementType.LEARNING_MODULE return IliasElementType.LEARNING_MODULE
if "baseClass=ilMediaCastHandlerGUI" in parsed_url.query:
return IliasElementType.MEDIACAST_VIDEO_FOLDER
if "baseClass=ilSAHSPresentationGUI" in parsed_url.query:
return IliasElementType.SCORM_LEARNING_MODULE
# Booking and Meeting can not be detected based on the link. They do have a ref_id though, so # Booking and Meeting can not be detected based on the link. They do have a ref_id though, so
# try to guess it from the image. # try to guess it from the image.
@ -998,11 +899,7 @@ class IliasPage:
if img_tag is None: if img_tag is None:
img_tag = found_parent.select_one("img.icon") img_tag = found_parent.select_one("img.icon")
is_session_expansion_button = found_parent.find( if img_tag is None and found_parent.find("a", attrs={"href": lambda x: x and "crs_next_sess=" in x}):
"a",
attrs={"href": lambda x: x and ("crs_next_sess=" in x or "crs_prev_sess=" in x)}
)
if img_tag is None and is_session_expansion_button:
log.explain("Found session expansion button, skipping it as it has no content") log.explain("Found session expansion button, skipping it as it has no content")
return None return None
@ -1012,7 +909,7 @@ class IliasPage:
return None return None
if "opencast" in str(img_tag["alt"]).lower(): if "opencast" in str(img_tag["alt"]).lower():
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED
if str(img_tag["src"]).endswith("icon_exc.svg"): if str(img_tag["src"]).endswith("icon_exc.svg"):
return IliasElementType.EXERCISE return IliasElementType.EXERCISE
@ -1032,12 +929,6 @@ class IliasPage:
if str(img_tag["src"]).endswith("icon_tst.svg"): if str(img_tag["src"]).endswith("icon_tst.svg"):
return IliasElementType.TEST return IliasElementType.TEST
if str(img_tag["src"]).endswith("icon_mcst.svg"):
return IliasElementType.MEDIACAST_VIDEO_FOLDER
if str(img_tag["src"]).endswith("icon_sahs.svg"):
return IliasElementType.SCORM_LEARNING_MODULE
return IliasElementType.FOLDER return IliasElementType.FOLDER
@staticmethod @staticmethod
@ -1067,34 +958,6 @@ class IliasPage:
rest_of_name = split_delimiter.join(meeting_name.split(split_delimiter)[1:]) rest_of_name = split_delimiter.join(meeting_name.split(split_delimiter)[1:])
return datetime.strftime(date_portion, "%Y-%m-%d") + split_delimiter + rest_of_name return datetime.strftime(date_portion, "%Y-%m-%d") + split_delimiter + rest_of_name
@staticmethod
def is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False
def _abs_url_from_link(self, link_tag: Tag) -> str: def _abs_url_from_link(self, link_tag: Tag) -> str:
""" """
Create an absolute url from an <a> tag. Create an absolute url from an <a> tag.

View File

@ -81,25 +81,21 @@ class KitIliasWebCrawlerSection(HttpCrawlerSection):
return self.s.getboolean("forums", fallback=False) return self.s.getboolean("forums", fallback=False)
_DIRECTORY_PAGES: Set[IliasElementType] = { _DIRECTORY_PAGES: Set[IliasElementType] = set([
IliasElementType.EXERCISE, IliasElementType.EXERCISE,
IliasElementType.EXERCISE_FILES, IliasElementType.EXERCISE_FILES,
IliasElementType.FOLDER, IliasElementType.FOLDER,
IliasElementType.INFO_TAB,
IliasElementType.MEETING, IliasElementType.MEETING,
IliasElementType.MEDIACAST_VIDEO_FOLDER, IliasElementType.VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER, IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, ])
}
_VIDEO_ELEMENTS: Set[IliasElementType] = { _VIDEO_ELEMENTS: Set[IliasElementType] = set([
IliasElementType.MEDIACAST_VIDEO_FOLDER, IliasElementType.VIDEO,
IliasElementType.MEDIACAST_VIDEO, IliasElementType.VIDEO_PLAYER,
IliasElementType.OPENCAST_VIDEO, IliasElementType.VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_PLAYER, IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED,
IliasElementType.OPENCAST_VIDEO_FOLDER, ])
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
}
def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]: def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]:
@ -140,10 +136,6 @@ def _wrap_io_in_warning(name: str) -> Callable[[AWrapped], AWrapped]:
return _iorepeat(1, name) return _iorepeat(1, name)
def _get_video_cache_key(element: IliasPageElement) -> str:
return f"ilias-video-cache-{element.id()}"
# Crawler control flow: # Crawler control flow:
# #
# crawl_desktop -+ # crawl_desktop -+
@ -267,8 +259,6 @@ instance's greatest bottleneck.
next_stage_url = None next_stage_url = None
elements.extend(page.get_child_elements()) elements.extend(page.get_child_elements())
if info_tab := page.get_info_tab():
elements.append(info_tab)
if description_string := page.get_description(): if description_string := page.get_description():
description.append(description_string) description.append(description_string)
@ -407,26 +397,16 @@ instance's greatest bottleneck.
"[bright_black](surveys contain no relevant data)" "[bright_black](surveys contain no relevant data)"
) )
return None return None
elif element.type == IliasElementType.SCORM_LEARNING_MODULE:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](scorm learning modules are not supported)"
)
return None
elif element.type == IliasElementType.LEARNING_MODULE: elif element.type == IliasElementType.LEARNING_MODULE:
return await self._handle_learning_module(element, element_path) return await self._handle_learning_module(element, element_path)
elif element.type == IliasElementType.LINK: elif element.type == IliasElementType.LINK:
return await self._handle_link(element, element_path) return await self._handle_link(element, element_path)
elif element.type == IliasElementType.BOOKING: elif element.type == IliasElementType.BOOKING:
return await self._handle_booking(element, element_path) return await self._handle_booking(element, element_path)
elif element.type == IliasElementType.OPENCAST_VIDEO: elif element.type == IliasElementType.VIDEO:
return await self._handle_file(element, element_path)
elif element.type == IliasElementType.OPENCAST_VIDEO_PLAYER:
return await self._handle_opencast_video(element, element_path)
elif element.type == IliasElementType.MEDIACAST_VIDEO:
return await self._handle_file(element, element_path) return await self._handle_file(element, element_path)
elif element.type == IliasElementType.VIDEO_PLAYER:
return await self._handle_video(element, element_path)
elif element.type in _DIRECTORY_PAGES: elif element.type in _DIRECTORY_PAGES:
return await self._handle_ilias_page(element.url, element, element_path) return await self._handle_ilias_page(element.url, element, element_path)
else: else:
@ -543,7 +523,7 @@ instance's greatest bottleneck.
raise CrawlError("resolve_link_target failed even after authenticating") raise CrawlError("resolve_link_target failed even after authenticating")
async def _handle_opencast_video( async def _handle_video(
self, self,
element: IliasPageElement, element: IliasPageElement,
element_path: PurePath, element_path: PurePath,
@ -551,8 +531,8 @@ instance's greatest bottleneck.
# Copy old mapping as it is likely still relevant # Copy old mapping as it is likely still relevant
if self.prev_report: if self.prev_report:
self.report.add_custom_value( self.report.add_custom_value(
_get_video_cache_key(element), str(element_path),
self.prev_report.get_custom_value(_get_video_cache_key(element)) self.prev_report.get_custom_value(str(element_path))
) )
# A video might contain other videos, so let's "crawl" the video first # A video might contain other videos, so let's "crawl" the video first
@ -562,69 +542,58 @@ instance's greatest bottleneck.
# to ensure backwards compatibility. # to ensure backwards compatibility.
maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.ALWAYS) maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.ALWAYS)
# If we do not want to crawl it (user filter), we can move on # If we do not want to crawl it (user filter) or we have every file
if not maybe_dl: # from the cached mapping already, we can ignore this and bail
return None if not maybe_dl or self._all_videos_locally_present(element_path):
# Mark all existing cideos as known so they do not get deleted
# If we have every file from the cached mapping already, we can ignore this and bail # during dleanup. We "downloaded" them, just without actually making
if self._all_opencast_videos_locally_present(element, maybe_dl.path): # a network request as we assumed they did not change.
# Mark all existing videos as known to ensure they do not get deleted during cleanup. for video in self._previous_contained_videos(element_path):
# We "downloaded" them, just without actually making a network request as we assumed
# they did not change.
contained = self._previous_contained_opencast_videos(element, maybe_dl.path)
if len(contained) > 1:
# Only do this if we threw away the original dl token,
# to not download single-stream videos twice
for video in contained:
await self.download(video) await self.download(video)
return None return None
return self._download_opencast_video(element, maybe_dl) return self._download_video(element_path, element, maybe_dl)
def _previous_contained_opencast_videos( def _previous_contained_videos(self, video_path: PurePath) -> List[PurePath]:
self, element: IliasPageElement, element_path: PurePath
) -> List[PurePath]:
if not self.prev_report: if not self.prev_report:
return [] return []
custom_value = self.prev_report.get_custom_value(_get_video_cache_key(element)) custom_value = self.prev_report.get_custom_value(str(video_path))
if not custom_value: if not custom_value:
return [] return []
cached_value = cast(dict[str, Any], custom_value) names = cast(List[str], custom_value)
if "known_paths" not in cached_value or "own_path" not in cached_value: folder = video_path.parent
log.explain(f"'known_paths' or 'own_path' missing from cached value: {cached_value}") return [PurePath(folder, name) for name in names]
return []
transformed_own_path = self._transformer.transform(element_path)
if cached_value["own_path"] != str(transformed_own_path):
log.explain(
f"own_path '{transformed_own_path}' does not match cached value: '{cached_value['own_path']}"
)
return []
return [PurePath(name) for name in cached_value["known_paths"]]
def _all_opencast_videos_locally_present(self, element: IliasPageElement, element_path: PurePath) -> bool: def _all_videos_locally_present(self, video_path: PurePath) -> bool:
log.explain_topic(f"Checking local cache for video {fmt_path(element_path)}") if contained_videos := self._previous_contained_videos(video_path):
if contained_videos := self._previous_contained_opencast_videos(element, element_path): log.explain_topic(f"Checking local cache for video {video_path.name}")
log.explain( all_found_locally = True
f"The following contained videos are known: {','.join(map(fmt_path, contained_videos))}" for video in contained_videos:
) transformed_path = self._to_local_video_path(video)
if all(self._output_dir.resolve(path).exists() for path in contained_videos): if transformed_path:
log.explain("Found all known videos locally, skipping enumeration request") exists_locally = self._output_dir.resolve(transformed_path).exists()
all_found_locally = all_found_locally and exists_locally
if all_found_locally:
log.explain("Found all videos locally, skipping enumeration request")
return True return True
log.explain("Missing at least one video, continuing with requests!") log.explain("Missing at least one video, continuing with requests!")
else:
log.explain("No local cache present")
return False return False
def _to_local_video_path(self, path: PurePath) -> Optional[PurePath]:
if transformed := self._transformer.transform(path):
return self._deduplicator.fixup_path(transformed)
return None
@anoncritical @anoncritical
@_iorepeat(3, "downloading video") @_iorepeat(3, "downloading video")
async def _download_opencast_video(self, element: IliasPageElement, dl: DownloadToken) -> None: async def _download_video(
def add_to_report(paths: list[str]) -> None: self,
self.report.add_custom_value( original_path: PurePath,
_get_video_cache_key(element), element: IliasPageElement,
{"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))} dl: DownloadToken
) ) -> None:
stream_elements: List[IliasPageElement] = []
async with dl as (bar, sink): async with dl as (bar, sink):
page = IliasPage(await self._get_page(element.url), element.url, element) page = IliasPage(await self._get_page(element.url), element.url, element)
stream_elements = page.get_child_elements() stream_elements = page.get_child_elements()
@ -635,25 +604,32 @@ instance's greatest bottleneck.
log.explain(f"Using single video mode for {element.name}") log.explain(f"Using single video mode for {element.name}")
stream_element = stream_elements[0] stream_element = stream_elements[0]
transformed_path = self._to_local_video_path(original_path)
if not transformed_path:
raise CrawlError(f"Download returned a path but transform did not for {original_path}")
# We do not have a local cache yet # We do not have a local cache yet
if self._output_dir.resolve(transformed_path).exists():
log.explain(f"Video for {element.name} existed locally")
else:
await self._stream_from_url(stream_element.url, sink, bar, is_video=True) await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
add_to_report([str(self._transformer.transform(dl.path))]) self.report.add_custom_value(str(original_path), [original_path.name])
return return
contained_video_paths: List[str] = [] contained_video_paths: List[str] = []
for stream_element in stream_elements: for stream_element in stream_elements:
video_path = dl.path.parent / stream_element.name video_path = original_path.parent / stream_element.name
contained_video_paths.append(str(video_path))
maybe_dl = await self.download(video_path, mtime=element.mtime, redownload=Redownload.NEVER) maybe_dl = await self.download(video_path, mtime=element.mtime, redownload=Redownload.NEVER)
if not maybe_dl: if not maybe_dl:
continue continue
async with maybe_dl as (bar, sink): async with maybe_dl as (bar, sink):
log.explain(f"Streaming video from real url {stream_element.url}") log.explain(f"Streaming video from real url {stream_element.url}")
contained_video_paths.append(str(self._transformer.transform(maybe_dl.path)))
await self._stream_from_url(stream_element.url, sink, bar, is_video=True) await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
add_to_report(contained_video_paths) self.report.add_custom_value(str(original_path), contained_video_paths)
async def _handle_file( async def _handle_file(
self, self,
@ -665,8 +641,8 @@ instance's greatest bottleneck.
return None return None
return self._download_file(element, maybe_dl) return self._download_file(element, maybe_dl)
@_iorepeat(3, "downloading file")
@anoncritical @anoncritical
@_iorepeat(3, "downloading file")
async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None: async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None:
assert dl # The function is only reached when dl is not None assert dl # The function is only reached when dl is not None
async with dl as (bar, sink): async with dl as (bar, sink):
@ -724,7 +700,7 @@ instance's greatest bottleneck.
log.explain(f"URL: {next_stage_url}") log.explain(f"URL: {next_stage_url}")
soup = await self._get_page(next_stage_url) soup = await self._get_page(next_stage_url)
page = IliasPage(soup, next_stage_url, element) page = IliasPage(soup, next_stage_url, None)
if next := page.get_next_stage_element(): if next := page.get_next_stage_element():
next_stage_url = next.url next_stage_url = next.url
@ -736,6 +712,7 @@ instance's greatest bottleneck.
raise CrawlWarning("Failed to extract forum data") raise CrawlWarning("Failed to extract forum data")
if download_data.empty: if download_data.empty:
log.explain("Forum had no threads") log.explain("Forum had no threads")
elements = []
return return
html = await self._post_authenticated(download_data.url, download_data.form_data) html = await self._post_authenticated(download_data.url, download_data.form_data)
elements = parse_ilias_forum_export(soupify(html)) elements = parse_ilias_forum_export(soupify(html))
@ -786,14 +763,14 @@ instance's greatest bottleneck.
log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}") log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {element.url}") log.explain(f"URL: {element.url}")
soup = await self._get_page(element.url) soup = await self._get_page(element.url)
page = IliasPage(soup, element.url, element) page = IliasPage(soup, element.url, None)
if next := page.get_learning_module_data(): if next := page.get_learning_module_data():
elements.extend(await self._crawl_learning_module_direction( elements.extend(await self._crawl_learning_module_direction(
cl.path, next.previous_url, "left", element cl.path, next.previous_url, "left"
)) ))
elements.append(next) elements.append(next)
elements.extend(await self._crawl_learning_module_direction( elements.extend(await self._crawl_learning_module_direction(
cl.path, next.next_url, "right", element cl.path, next.next_url, "right"
)) ))
# Reflect their natural ordering in the file names # Reflect their natural ordering in the file names
@ -815,8 +792,7 @@ instance's greatest bottleneck.
self, self,
path: PurePath, path: PurePath,
start_url: Optional[str], start_url: Optional[str],
dir: Union[Literal["left"], Literal["right"]], dir: Union[Literal["left"], Literal["right"]]
parent_element: IliasPageElement
) -> List[IliasLearningModulePage]: ) -> List[IliasLearningModulePage]:
elements: List[IliasLearningModulePage] = [] elements: List[IliasLearningModulePage] = []
@ -829,7 +805,7 @@ instance's greatest bottleneck.
log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})") log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})")
log.explain(f"URL: {next_element_url}") log.explain(f"URL: {next_element_url}")
soup = await self._get_page(next_element_url) soup = await self._get_page(next_element_url)
page = IliasPage(soup, next_element_url, parent_element) page = IliasPage(soup, next_element_url, None)
if next := page.get_learning_module_data(): if next := page.get_learning_module_data():
elements.append(next) elements.append(next)
if dir == "left": if dir == "left":
@ -901,7 +877,7 @@ instance's greatest bottleneck.
auth_id = await self._current_auth_id() auth_id = await self._current_auth_id()
async with self.session.get(url) as request: async with self.session.get(url) as request:
soup = soupify(await request.read()) soup = soupify(await request.read())
if IliasPage.is_logged_in(soup): if self._is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed) return self._verify_page(soup, url, root_page_allowed)
# We weren't authenticated, so try to do that # We weren't authenticated, so try to do that
@ -910,12 +886,11 @@ instance's greatest bottleneck.
# Retry once after authenticating. If this fails, we will die. # Retry once after authenticating. If this fails, we will die.
async with self.session.get(url) as request: async with self.session.get(url) as request:
soup = soupify(await request.read()) soup = soupify(await request.read())
if IliasPage.is_logged_in(soup): if self._is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed) return self._verify_page(soup, url, root_page_allowed)
raise CrawlError(f"get_page failed even after authenticating on {url!r}") raise CrawlError("get_page failed even after authenticating")
@staticmethod def _verify_page(self, soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup:
def _verify_page(soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup:
if IliasPage.is_root_page(soup) and not root_page_allowed: if IliasPage.is_root_page(soup) and not root_page_allowed:
raise CrawlError( raise CrawlError(
"Unexpectedly encountered ILIAS root page. " "Unexpectedly encountered ILIAS root page. "
@ -973,6 +948,34 @@ instance's greatest bottleneck.
async def _authenticate(self) -> None: async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session) await self._shibboleth_login.login(self.session)
@ staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False
class KitShibbolethLogin: class KitShibbolethLogin:
""" """
@ -1119,7 +1122,7 @@ async def _shib_post(
async with session.get(correct_url, allow_redirects=False) as response: async with session.get(correct_url, allow_redirects=False) as response:
location = response.headers.get("location") location = response.headers.get("location")
log.explain(f"Redirected to {location!r} with status {response.status}") log.explain(f"Redirected to {location!r} with status {response.status}")
# If shib still has a valid session, it will directly respond to the request # If shib still still has a valid session, it will directly respond to the request
if location is None: if location is None:
log.explain("Shib recognized us, returning its response directly") log.explain("Shib recognized us, returning its response directly")
return soupify(await response.read()) return soupify(await response.read())

View File

@ -14,7 +14,7 @@ def name_variants(path: PurePath) -> Iterator[PurePath]:
class Deduplicator: class Deduplicator:
FORBIDDEN_CHARS = '<>:"/\\|?*' + "".join([chr(i) for i in range(0, 32)]) FORBIDDEN_CHARS = '<>:"/\\|?*'
FORBIDDEN_NAMES = { FORBIDDEN_NAMES = {
"CON", "PRN", "AUX", "NUL", "CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",

View File

@ -1,5 +1,5 @@
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional from typing import Dict, List, Optional, Set
from rich.markup import escape from rich.markup import escape
@ -43,16 +43,24 @@ class Pferd:
crawl_sections = [name for name, _ in config.crawl_sections()] crawl_sections = [name for name, _ in config.crawl_sections()]
crawlers_to_run = [] # With crawl: prefix crawlers_to_run = set() # With crawl: prefix
unknown_names = [] # Without crawl: prefix unknown_names = [] # Without crawl: prefix
for name in cli_crawlers: for name in cli_crawlers:
section_name = f"crawl:{name}" section_name = f"crawl:{name}"
if section_name in crawl_sections: if section_name in crawl_sections:
log.explain(f"Crawler section named {section_name!r} exists") log.explain(f"Crawler section named {section_name!r} exists")
crawlers_to_run.append(section_name) crawlers_to_run.add(section_name)
else: # interprete name as alias of a crawler
log.explain(f"There's no crawler section named {section_name!r}") alias_names = self._find_crawlers_by_alias(name, config)
if alias_names:
crawlers_to_run.update(alias_names)
log.explain_topic(f"Crawler alias {name!r} found corresponding crawler sections:")
for alias_name in alias_names:
log.explain(f"Crawler section named {alias_name!r} with alias {name!r} exists")
if not section_name in crawl_sections and not alias_names:
log.explain(f"There's neither a crawler section named {section_name!r} nor does a crawler with alias {name!r} exist.")
unknown_names.append(name) unknown_names.append(name)
if unknown_names: if unknown_names:
@ -65,6 +73,14 @@ class Pferd:
return crawlers_to_run return crawlers_to_run
def _find_crawlers_by_alias(self, alias: str, config: Config) -> Set[str]:
alias_names = set()
for (section_name, section) in config.crawl_sections():
section_aliases = section.get("aliases", [])
if alias in section_aliases:
alias_names.add(section_name)
return alias_names
def _find_crawlers_to_run( def _find_crawlers_to_run(
self, self,
config: Config, config: Config,

View File

@ -1,2 +1,2 @@
NAME = "PFERD" NAME = "PFERD"
VERSION = "3.5.0" VERSION = "3.4.3"

27
flake.lock generated
View File

@ -1,27 +0,0 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1694499547,
"narHash": "sha256-R7xMz1Iia6JthWRHDn36s/E248WB1/je62ovC/dUVKI=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "e5f018cf150e29aac26c61dac0790ea023c46b24",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.05",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

View File

@ -1,41 +0,0 @@
{
description = "Tool for downloading course-related files from ILIAS";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05";
};
outputs = { self, nixpkgs }:
let
# Helper function to generate an attrset '{ x86_64-linux = f "x86_64-linux"; ... }'.
forAllSystems = nixpkgs.lib.genAttrs nixpkgs.lib.systems.flakeExposed;
in
{
packages = forAllSystems (system:
let pkgs = import nixpkgs { inherit system; };
in
rec {
default = pkgs.python3Packages.buildPythonApplication rec {
pname = "pferd";
# Performing black magic
# Don't worry, I sacrificed enough goats for the next few years
version = (pkgs.lib.importTOML ./PFERD/version.py).VERSION;
format = "pyproject";
src = ./.;
nativeBuildInputs = with pkgs.python3Packages; [
setuptools
];
propagatedBuildInputs = with pkgs.python3Packages; [
aiohttp
beautifulsoup4
rich
keyring
certifi
];
};
});
};
}