Compare commits

..

10 Commits

Author SHA1 Message Date
bf27f4a686 TODO 2023-04-19 10:13:36 +02:00
5adfdfbd2b Switch http_crawler to requests 2023-04-19 10:12:48 +02:00
5c3942a13d Fix flake8 error 2023-04-19 10:12:48 +02:00
5c9209b12e Document path formatting functions 2023-04-19 10:12:48 +02:00
50c7778d38 Use mypy to install library stub packages 2023-04-19 10:12:48 +02:00
354a22d1e3 Add vscode settings 2023-04-19 10:12:48 +02:00
6f87c5c774 Make ipd crawler synchronous 2023-04-19 10:12:48 +02:00
1ca10571f0 Remove limiter 2023-04-19 10:12:48 +02:00
10e1a5e871 De-Async ilias crawler 2023-04-19 10:12:48 +02:00
a2ffce4702 Make local crawler synchronous 2023-04-19 10:12:48 +02:00
27 changed files with 277 additions and 1028 deletions

2
.gitignore vendored
View File

@ -2,8 +2,6 @@
/.venv/
/PFERD.egg-info/
__pycache__/
/.vscode/
/.idea/
# pyinstaller
/pferd.spec

8
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,8 @@
{
"files.insertFinalNewline": true,
"files.trimFinalNewlines": true,
"python.formatting.provider": "autopep8",
"python.linting.enabled": true,
"python.linting.flake8Enabled": true,
"python.linting.mypyEnabled": true,
}

View File

@ -22,31 +22,10 @@ ambiguous situations.
## Unreleased
### Fixed
- Video name deduplication
## 3.5.0 - 2023-09-13
### Added
- `no-delete-prompt-override` conflict resolution strategy
- Support for ILIAS learning modules
- `show_not_deleted` option to stop printing the "Not Deleted" status or report
message. This combines nicely with the `no-delete-prompt-override` strategy,
causing PFERD to mostly ignore local-only files.
- Support for mediacast video listings
- Crawling of files in info tab
### Changed
- Remove size suffix for files in content pages
### Fixed
- Crawling of courses with the timeline view as the default tab
- Crawling of file and custom opencast cards
- Crawling of button cards without descriptions
- Abort crawling when encountering an unexpected ilias root page redirect
- Sanitize ascii control characters on Windows
- Crawling of paginated past meetings
- Ignore SCORM learning modules
## 3.4.3 - 2022-11-29

View File

@ -26,9 +26,6 @@ default values for the other sections.
`Added ...`) while running a crawler. (Default: `yes`)
- `report`: Whether PFERD should print a report of added, changed and deleted
local files for all crawlers before exiting. (Default: `yes`)
- `show_not_deleted`: Whether PFERD should print messages in status and report
when a local-only file wasn't deleted. Combines nicely with the
`no-delete-prompt-override` conflict resolution strategy.
- `share_cookies`: Whether crawlers should share cookies where applicable. For
example, some crawlers share cookies if they crawl the same website using the
same account. (Default: `yes`)
@ -78,9 +75,6 @@ common to all crawlers:
using `prompt` and always choosing "yes".
- `no-delete`: Never delete local files, but overwrite local files if the
remote file is different.
- `no-delete-prompt-overwrite`: Never delete local files, but prompt to
overwrite local files if the remote file is different. Combines nicely
with the `show_not_deleted` option.
- `transform`: Rules for renaming and excluding certain files and directories.
For more details, see [this section](#transformation-rules). (Default: empty)
- `tasks`: The maximum number of concurrent tasks (such as crawling or

View File

@ -1,6 +1,5 @@
Copyright 2019-2021 Garmelon, I-Al-Istannen, danstooamerican, pavelzw,
TheChristophe, Scriptim, thelukasprobst, Toorero,
Mr-Pine
TheChristophe, Scriptim, thelukasprobst, Toorero
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in

View File

@ -47,8 +47,6 @@ def configure_logging_from_args(args: argparse.Namespace) -> None:
log.output_explain = args.explain
if args.status is not None:
log.output_status = args.status
if args.show_not_deleted is not None:
log.output_not_deleted = args.show_not_deleted
if args.report is not None:
log.output_report = args.report
@ -74,8 +72,6 @@ def configure_logging_from_config(args: argparse.Namespace, config: Config) -> N
log.output_status = config.default_section.status()
if args.report is None:
log.output_report = config.default_section.report()
if args.show_not_deleted is None:
log.output_not_deleted = config.default_section.show_not_deleted()
except ConfigOptionError as e:
log.error(str(e))
sys.exit(1)

View File

@ -215,11 +215,6 @@ PARSER.add_argument(
action=BooleanOptionalAction,
help="whether crawlers should share cookies where applicable"
)
PARSER.add_argument(
"--show-not-deleted",
action=BooleanOptionalAction,
help="print messages in status and report when PFERD did not delete a local only file"
)
def load_default_section(
@ -238,8 +233,6 @@ def load_default_section(
section["report"] = "yes" if args.report else "no"
if args.share_cookies is not None:
section["share_cookies"] = "yes" if args.share_cookies else "no"
if args.show_not_deleted is not None:
section["show_not_deleted"] = "yes" if args.show_not_deleted else "no"
SUBPARSERS = PARSER.add_subparsers(title="crawlers")

View File

@ -82,9 +82,6 @@ class DefaultSection(Section):
def report(self) -> bool:
return self.s.getboolean("report", fallback=True)
def show_not_deleted(self) -> bool:
return self.s.getboolean("show_not_deleted", fallback=True)
def share_cookies(self) -> bool:
return self.s.getboolean("share_cookies", fallback=True)

View File

@ -9,7 +9,6 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Ty
from ..auth import Authenticator
from ..config import Config, Section
from ..deduplicator import Deduplicator
from ..limiter import Limiter
from ..logging import ProgressBar, log
from ..output_dir import FileSink, FileSinkToken, OnConflict, OutputDirectory, OutputDirError, Redownload
from ..report import MarkConflictError, MarkDuplicateError, Report
@ -98,10 +97,9 @@ def anoncritical(f: AWrapped) -> AWrapped:
class CrawlToken(ReusableAsyncContextManager[ProgressBar]):
def __init__(self, limiter: Limiter, path: PurePath):
def __init__(self, path: PurePath):
super().__init__()
self._limiter = limiter
self._path = path
@property
@ -110,17 +108,15 @@ class CrawlToken(ReusableAsyncContextManager[ProgressBar]):
async def _on_aenter(self) -> ProgressBar:
self._stack.callback(lambda: log.status("[bold cyan]", "Crawled", fmt_path(self._path)))
await self._stack.enter_async_context(self._limiter.limit_crawl())
bar = self._stack.enter_context(log.crawl_bar("[bold bright_cyan]", "Crawling", fmt_path(self._path)))
return bar
class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
def __init__(self, limiter: Limiter, fs_token: FileSinkToken, path: PurePath):
def __init__(self, fs_token: FileSinkToken, path: PurePath):
super().__init__()
self._limiter = limiter
self._fs_token = fs_token
self._path = path
@ -129,7 +125,6 @@ class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
return self._path
async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]:
await self._stack.enter_async_context(self._limiter.limit_download())
sink = await self._stack.enter_async_context(self._fs_token)
# The "Downloaded ..." message is printed in the output dir, not here
bar = self._stack.enter_context(log.download_bar("[bold bright_cyan]", "Downloading",
@ -235,12 +230,6 @@ class Crawler(ABC):
self.name = name
self.error_free = True
self._limiter = Limiter(
task_limit=section.tasks(),
download_limit=section.downloads(),
task_delay=section.task_delay(),
)
self._deduplicator = Deduplicator(section.windows_paths())
self._transformer = Transformer(section.transform())
@ -288,7 +277,7 @@ class Crawler(ABC):
return None
log.explain("Answer: Yes")
return CrawlToken(self._limiter, path)
return CrawlToken(path)
async def download(
self,
@ -313,7 +302,7 @@ class Crawler(ABC):
return None
log.explain("Answer: Yes")
return DownloadToken(self._limiter, fs_token, path)
return DownloadToken(fs_token, path)
async def _cleanup(self) -> None:
log.explain_topic("Decision: Clean up files")

View File

@ -1,12 +1,9 @@
import asyncio
import http.cookies
import ssl
from http.cookiejar import LWPCookieJar
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional
from typing import Dict, List, Optional
import aiohttp
import certifi
from aiohttp.client import ClientTimeout
import requests
from ..auth import Authenticator
from ..config import Config
@ -35,9 +32,9 @@ class HttpCrawler(Crawler):
self._authentication_id = 0
self._authentication_lock = asyncio.Lock()
self._request_count = 0
self._http_timeout = section.http_timeout()
self._http_timeout = section.http_timeout() # TODO Use or remove
self._cookie_jar = LWPCookieJar()
self._cookie_jar_path = self._output_dir.resolve(self.COOKIE_FILE)
self._shared_cookie_jar_paths: Optional[List[Path]] = None
self._shared_auth = shared_auth
@ -57,7 +54,6 @@ class HttpCrawler(Crawler):
# This should reduce the amount of requests we make: If an authentication is in progress
# all future requests wait for authentication to complete.
async with self._authentication_lock:
self._request_count += 1
return self._authentication_id
async def authenticate(self, caller_auth_id: int) -> None:
@ -106,32 +102,13 @@ class HttpCrawler(Crawler):
self._shared_cookie_jar_paths.append(self._cookie_jar_path)
def _load_cookies_from_file(self, path: Path) -> None:
jar: Any = http.cookies.SimpleCookie()
with open(path, encoding="utf-8") as f:
for i, line in enumerate(f):
# Names of headers are case insensitive
if line[:11].lower() == "set-cookie:":
jar.load(line[11:])
else:
log.explain(f"Line {i} doesn't start with 'Set-Cookie:', ignoring it")
self._cookie_jar.update_cookies(jar)
def _save_cookies_to_file(self, path: Path) -> None:
jar: Any = http.cookies.SimpleCookie()
for morsel in self._cookie_jar:
jar[morsel.key] = morsel
with open(path, "w", encoding="utf-8") as f:
f.write(jar.output(sep="\n"))
f.write("\n") # A trailing newline is just common courtesy
def _load_cookies(self) -> None:
log.explain_topic("Loading cookies")
cookie_jar_path: Optional[Path] = None
if self._shared_cookie_jar_paths is None:
log.explain("Not sharing any cookies")
log.explain("Not sharing cookies")
cookie_jar_path = self._cookie_jar_path
else:
log.explain("Sharing cookies")
@ -154,46 +131,38 @@ class HttpCrawler(Crawler):
log.explain(f"Loading cookies from {fmt_real_path(cookie_jar_path)}")
try:
self._load_cookies_from_file(cookie_jar_path)
self._cookie_jar.load(filename=str(cookie_jar_path))
except Exception as e:
log.explain("Failed to load cookies")
log.explain(str(e))
log.explain(f"Failed to load cookies: {e}")
log.explain("Proceeding without cookies")
def _save_cookies(self) -> None:
log.explain_topic("Saving cookies")
try:
log.explain(f"Saving cookies to {fmt_real_path(self._cookie_jar_path)}")
self._save_cookies_to_file(self._cookie_jar_path)
self._cookie_jar.save(filename=str(self._cookie_jar_path))
except Exception as e:
log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}")
log.warn(str(e))
log.warn(f"Failed to save cookies: {e}")
async def run(self) -> None:
self._request_count = 0
self._cookie_jar = aiohttp.CookieJar()
self._load_cookies()
async with aiohttp.ClientSession(
headers={"User-Agent": f"{NAME}/{VERSION}"},
cookie_jar=self._cookie_jar,
connector=aiohttp.TCPConnector(ssl=ssl.create_default_context(cafile=certifi.where())),
timeout=ClientTimeout(
# 30 minutes. No download in the history of downloads was longer than 30 minutes.
# This is enough to transfer a 600 MB file over a 3 Mib/s connection.
# Allowing an arbitrary value could be annoying for overnight batch jobs
total=15 * 60,
connect=self._http_timeout,
sock_connect=self._http_timeout,
sock_read=self._http_timeout,
)
) as session:
self.session = session
self.session = requests.Session()
self.session.headers["User-Agent"] = f"{NAME}/{VERSION}"
# From the request docs: "All requests code should work out of the box
# with externally provided instances of CookieJar, e.g. LWPCookieJar and
# FileCookieJar."
# https://requests.readthedocs.io/en/latest/api/#requests.cookies.RequestsCookieJar
self.session.cookies = self._cookie_jar # type: ignore
with self.session:
try:
await super().run()
finally:
del self.session
log.explain_topic(f"Total amount of HTTP requests: {self._request_count}")
# They are saved in authenticate, but a final save won't hurt
self._save_cookies()

View File

@ -1,10 +1,6 @@
from enum import Enum
from typing import Optional
import bs4
from PFERD.utils import soupify
_link_template_plain = "{{link}}"
_link_template_fancy = """
<!DOCTYPE html>
@ -98,71 +94,6 @@ _link_template_internet_shortcut = """
URL={{link}}
""".strip()
_learning_module_template = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>{{name}}</title>
</head>
<style>
* {
box-sizing: border-box;
}
.center-flex {
display: flex;
align-items: center;
justify-content: center;
}
.nav {
display: flex;
justify-content: space-between;
}
</style>
<body class="center-flex">
{{body}}
</body>
</html>
"""
def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str:
# Seems to be comments, ignore those.
for elem in body.select(".il-copg-mob-fullscreen-modal"):
elem.decompose()
nav_template = """
<div class="nav">
{{left}}
{{right}}
</div>
"""
if prev and body.select_one(".ilc_page_lnav_LeftNavigation"):
text = body.select_one(".ilc_page_lnav_LeftNavigation").getText().strip()
left = f'<a href="{prev}">{text}</a>'
else:
left = "<span></span>"
if next and body.select_one(".ilc_page_rnav_RightNavigation"):
text = body.select_one(".ilc_page_rnav_RightNavigation").getText().strip()
right = f'<a href="{next}">{text}</a>'
else:
right = "<span></span>"
if top_nav := body.select_one(".ilc_page_tnav_TopNavigation"):
top_nav.replace_with(
soupify(nav_template.replace("{{left}}", left).replace("{{right}}", right).encode())
)
if bot_nav := body.select_one(".ilc_page_bnav_BottomNavigation"):
bot_nav.replace_with(soupify(nav_template.replace(
"{{left}}", left).replace("{{right}}", right).encode())
)
body = body.prettify()
return _learning_module_template.replace("{{body}}", body).replace("{{name}}", name)
class Links(Enum):
IGNORE = "ignore"
@ -171,24 +102,24 @@ class Links(Enum):
INTERNET_SHORTCUT = "internet-shortcut"
def template(self) -> Optional[str]:
if self == Links.FANCY:
if self == self.FANCY:
return _link_template_fancy
elif self == Links.PLAINTEXT:
elif self == self.PLAINTEXT:
return _link_template_plain
elif self == Links.INTERNET_SHORTCUT:
elif self == self.INTERNET_SHORTCUT:
return _link_template_internet_shortcut
elif self == Links.IGNORE:
elif self == self.IGNORE:
return None
raise ValueError("Missing switch case")
def extension(self) -> Optional[str]:
if self == Links.FANCY:
if self == self.FANCY:
return ".html"
elif self == Links.PLAINTEXT:
elif self == self.PLAINTEXT:
return ".txt"
elif self == Links.INTERNET_SHORTCUT:
elif self == self.INTERNET_SHORTCUT:
return ".url"
elif self == Links.IGNORE:
elif self == self.IGNORE:
return None
raise ValueError("Missing switch case")

View File

@ -82,7 +82,7 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup:
dummy.decompose()
if len(children) > 1:
continue
if isinstance(type(children[0]), Comment):
if type(children[0]) == Comment:
dummy.decompose()
for hrule_imposter in soup.find_all(class_="ilc_section_Separator"):

View File

@ -3,7 +3,7 @@ import re
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional, Union, cast
from typing import Dict, List, Optional, Union
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup, Tag
@ -22,18 +22,13 @@ class IliasElementType(Enum):
FOLDER = "folder"
FORUM = "forum"
LINK = "link"
INFO_TAB = "info_tab"
LEARNING_MODULE = "learning_module"
BOOKING = "booking"
MEETING = "meeting"
SURVEY = "survey"
SCORM_LEARNING_MODULE = "scorm_learning_module"
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder"
MEDIACAST_VIDEO = "mediacast_video"
OPENCAST_VIDEO = "opencast_video"
OPENCAST_VIDEO_PLAYER = "opencast_video_player"
OPENCAST_VIDEO_FOLDER = "opencast_video_folder"
OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated"
VIDEO = "video"
VIDEO_PLAYER = "video_player"
VIDEO_FOLDER = "video_folder"
VIDEO_FOLDER_MAYBE_PAGINATED = "video_folder_maybe_paginated"
@dataclass
@ -49,8 +44,7 @@ class IliasPageElement:
r"eid=(?P<id>[0-9a-z\-]+)",
r"file_(?P<id>\d+)",
r"ref_id=(?P<id>\d+)",
r"target=[a-z]+_(?P<id>\d+)",
r"mm_(?P<id>\d+)"
r"target=[a-z]+_(?P<id>\d+)"
]
for regex in regexes:
@ -77,14 +71,6 @@ class IliasForumThread:
mtime: Optional[datetime]
@dataclass
class IliasLearningModulePage:
title: str
content: Tag
next_url: Optional[str]
previous_url: Optional[str]
class IliasPage:
def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]):
@ -93,16 +79,6 @@ class IliasPage:
self._page_type = source_element.type if source_element else None
self._source_name = source_element.name if source_element else ""
@staticmethod
def is_root_page(soup: BeautifulSoup) -> bool:
permalink = soup.find(id="current_perma_link")
if permalink is None:
return False
value = permalink.attrs.get("value")
if value is None:
return False
return "goto.php?target=root_" in value
def get_child_elements(self) -> List[IliasPageElement]:
"""
Return all child page elements you can find here.
@ -110,9 +86,9 @@ class IliasPage:
if self._is_video_player():
log.explain("Page is a video player, extracting URL")
return self._player_to_video()
if self._is_opencast_video_listing():
log.explain("Page is an opencast video listing, searching for elements")
return self._find_opencast_video_entries()
if self._is_video_listing():
log.explain("Page is a video listing, searching for elements")
return self._find_video_entries()
if self._is_exercise_file():
log.explain("Page is an exercise, searching for elements")
return self._find_exercise_entries()
@ -122,25 +98,9 @@ class IliasPage:
if self._is_content_page():
log.explain("Page is a content page, searching for elements")
return self._find_copa_entries()
if self._is_info_tab():
log.explain("Page is info tab, searching for elements")
return self._find_info_tab_entries()
log.explain("Page is a normal folder, searching for elements")
return self._find_normal_entries()
def get_info_tab(self) -> Optional[IliasPageElement]:
tab: Optional[Tag] = self._soup.find(
name="a",
attrs={"href": lambda x: x and "cmdClass=ilinfoscreengui" in x}
)
if tab is not None:
return IliasPageElement(
IliasElementType.INFO_TAB,
self._abs_url_from_link(tab),
"infos"
)
return None
def get_description(self) -> Optional[BeautifulSoup]:
def is_interesting_class(name: str) -> bool:
return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"]
@ -166,34 +126,6 @@ class IliasPage:
return BeautifulSoup(raw_html, "html.parser")
def get_learning_module_data(self) -> Optional[IliasLearningModulePage]:
if not self._is_learning_module_page():
return None
content = self._soup.select_one("#ilLMPageContent")
title = self._soup.select_one(".ilc_page_title_PageTitle").getText().strip()
return IliasLearningModulePage(
title=title,
content=content,
next_url=self._find_learning_module_next(),
previous_url=self._find_learning_module_prev()
)
def _find_learning_module_next(self) -> Optional[str]:
for link in self._soup.select("a.ilc_page_rnavlink_RightNavigationLink"):
url = self._abs_url_from_link(link)
if "baseClass=ilLMPresentationGUI" not in url:
continue
return url
return None
def _find_learning_module_prev(self) -> Optional[str]:
for link in self._soup.select("a.ilc_page_lnavlink_LeftNavigationLink"):
url = self._abs_url_from_link(link)
if "baseClass=ilLMPresentationGUI" not in url:
continue
return url
return None
def get_download_forum_data(self) -> Optional[IliasDownloadForumData]:
form = self._soup.find("form", attrs={"action": lambda x: x and "fallbackCmd=showThreads" in x})
if not form:
@ -220,18 +152,14 @@ class IliasPage:
if self._is_ilias_opencast_embedding():
log.explain("Unwrapping opencast embedding")
return self.get_child_elements()[0]
if self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED:
if self._page_type == IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED:
log.explain("Unwrapping video pagination")
return self._find_opencast_video_entries_paginated()[0]
return self._find_video_entries_paginated()[0]
if self._contains_collapsed_future_meetings():
log.explain("Requesting *all* future meetings")
return self._uncollapse_future_meetings_url()
if not self._is_content_tab_selected():
if self._page_type != IliasElementType.INFO_TAB:
log.explain("Selecting content tab")
return self._select_content_page_url()
else:
log.explain("Crawling info tab, skipping content select")
return self._select_content_page_url()
return None
def _is_forum_page(self) -> bool:
@ -244,7 +172,7 @@ class IliasPage:
def _is_video_player(self) -> bool:
return "paella_config_file" in str(self._soup)
def _is_opencast_video_listing(self) -> bool:
def _is_video_listing(self) -> bool:
if self._is_ilias_opencast_embedding():
return True
@ -284,20 +212,11 @@ class IliasPage:
return False
return "target=copa_" in link.get("value")
def _is_learning_module_page(self) -> bool:
link = self._soup.find(id="current_perma_link")
if not link:
return False
return "target=pg_" in link.get("value")
def _contains_collapsed_future_meetings(self) -> bool:
return self._uncollapse_future_meetings_url() is not None
def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]:
element = self._soup.find(
"a",
attrs={"href": lambda x: x and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)}
)
element = self._soup.find("a", attrs={"href": lambda x: x and "crs_next_sess=1" in x})
if not element:
return None
link = self._abs_url_from_link(element)
@ -306,10 +225,6 @@ class IliasPage:
def _is_content_tab_selected(self) -> bool:
return self._select_content_page_url() is None
def _is_info_tab(self) -> bool:
might_be_info = self._soup.find("form", attrs={"name": lambda x: x == "formInfoScreen"}) is not None
return self._page_type == IliasElementType.INFO_TAB and might_be_info
def _select_content_page_url(self) -> Optional[IliasPageElement]:
tab = self._soup.find(
id="tab_view_content",
@ -351,14 +266,14 @@ class IliasPage:
# and just fetch the lone video url!
if len(streams) == 1:
video_url = streams[0]["sources"]["mp4"][0]["src"]
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, self._source_name)]
return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)]
log.explain(f"Found multiple videos for stream at {self._source_name}")
items = []
for stream in sorted(streams, key=lambda stream: stream["content"]):
full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4"
video_url = stream["sources"]["mp4"][0]["src"]
items.append(IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, full_name))
items.append(IliasPageElement(IliasElementType.VIDEO, video_url, full_name))
return items
@ -406,8 +321,7 @@ class IliasPage:
for link in links:
url = self._abs_url_from_link(link)
name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.getText()).strip().replace("\t", "")
name = _sanitize_path_name(name)
name = _sanitize_path_name(link.getText().strip().replace("\t", ""))
if "file_id" not in url:
_unexpected_html_warning()
@ -418,24 +332,7 @@ class IliasPage:
return items
def _find_info_tab_entries(self) -> List[IliasPageElement]:
items = []
links: List[Tag] = self._soup.select("a.il_ContainerItemCommand")
for link in links:
if "cmdClass=ilobjcoursegui" not in link["href"]:
continue
if "cmd=sendfile" not in link["href"]:
continue
items.append(IliasPageElement(
IliasElementType.FILE,
self._abs_url_from_link(link),
_sanitize_path_name(link.getText())
))
return items
def _find_opencast_video_entries(self) -> List[IliasPageElement]:
def _find_video_entries(self) -> List[IliasPageElement]:
# ILIAS has three stages for video pages
# 1. The initial dummy page without any videos. This page contains the link to the listing
# 2. The video listing which might be paginated
@ -455,27 +352,27 @@ class IliasPage:
query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
url = url_set_query_params(url, query_params)
log.explain("Found ILIAS video frame page, fetching actual content next")
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, url, "")]
return [IliasPageElement(IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED, url, "")]
is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None
if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER:
if is_paginated and not self._page_type == IliasElementType.VIDEO_FOLDER:
# We are in stage 2 - try to break pagination
return self._find_opencast_video_entries_paginated()
return self._find_video_entries_paginated()
return self._find_opencast_video_entries_no_paging()
return self._find_video_entries_no_paging()
def _find_opencast_video_entries_paginated(self) -> List[IliasPageElement]:
def _find_video_entries_paginated(self) -> List[IliasPageElement]:
table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+"))
if table_element is None:
log.warn("Couldn't increase elements per page (table not found). I might miss elements.")
return self._find_opencast_video_entries_no_paging()
return self._find_video_entries_no_paging()
id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"])
if id_match is None:
log.warn("Couldn't increase elements per page (table id not found). I might miss elements.")
return self._find_opencast_video_entries_no_paging()
return self._find_video_entries_no_paging()
table_id = id_match.group(1)
@ -484,9 +381,9 @@ class IliasPage:
url = url_set_query_params(self._page_url, query_params)
log.explain("Disabled pagination, retrying folder as a new entry")
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")]
return [IliasPageElement(IliasElementType.VIDEO_FOLDER, url, "")]
def _find_opencast_video_entries_no_paging(self) -> List[IliasPageElement]:
def _find_video_entries_no_paging(self) -> List[IliasPageElement]:
"""
Crawls the "second stage" video page. This page contains the actual video urls.
"""
@ -498,11 +395,11 @@ class IliasPage:
results: List[IliasPageElement] = []
for link in video_links:
results.append(self._listed_opencast_video_to_element(link))
results.append(self._listed_video_to_element(link))
return results
def _listed_opencast_video_to_element(self, link: Tag) -> IliasPageElement:
def _listed_video_to_element(self, link: Tag) -> IliasPageElement:
# The link is part of a table with multiple columns, describing metadata.
# 6th or 7th child (1 indexed) is the modification time string. Try to find it
# by parsing backwards from the end and finding something that looks like a date
@ -529,9 +426,7 @@ class IliasPage:
video_url = self._abs_url_from_link(link)
log.explain(f"Found video {video_name!r} at {video_url}")
return IliasPageElement(
IliasElementType.OPENCAST_VIDEO_PLAYER, video_url, video_name, modification_time
)
return IliasPageElement(IliasElementType.VIDEO_PLAYER, video_url, video_name, modification_time)
def _find_exercise_entries(self) -> List[IliasPageElement]:
if self._soup.find(id="tab_submission"):
@ -674,48 +569,9 @@ class IliasPage:
result.append(IliasPageElement(element_type, abs_url, element_name, description=description))
result += self._find_cards()
result += self._find_mediacast_videos()
return result
def _find_mediacast_videos(self) -> List[IliasPageElement]:
videos: List[IliasPageElement] = []
for elem in cast(List[Tag], self._soup.select(".ilPlayerPreviewOverlayOuter")):
element_name = _sanitize_path_name(
elem.select_one(".ilPlayerPreviewDescription").getText().strip()
)
if not element_name.endswith(".mp4"):
# just to make sure it has some kinda-alrightish ending
element_name = element_name + ".mp4"
video_element = elem.find(name="video")
if not video_element:
_unexpected_html_warning()
log.warn_contd(f"No <video> element found for mediacast video '{element_name}'")
continue
videos.append(IliasPageElement(
type=IliasElementType.MEDIACAST_VIDEO,
url=self._abs_url_from_relative(video_element.get("src")),
name=element_name,
mtime=self._find_mediacast_video_mtime(elem.findParent(name="td"))
))
return videos
def _find_mediacast_video_mtime(self, enclosing_td: Tag) -> Optional[datetime]:
description_td: Tag = enclosing_td.findPreviousSibling("td")
if not description_td:
return None
meta_tag: Tag = description_td.find_all("p")[-1]
if not meta_tag:
return None
updated_str = meta_tag.getText().strip().replace("\n", " ")
updated_str = re.sub(".+?: ", "", updated_str)
return demangle_date(updated_str)
def _is_in_expanded_meeting(self, tag: Tag) -> bool:
"""
Returns whether a file is part of an expanded meeting.
@ -887,7 +743,7 @@ class IliasPage:
icon: Tag = card_root.select_one(".il-card-repository-head .icon")
if "opencast" in icon["class"] or "xoct" in icon["class"]:
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED
if "exc" in icon["class"]:
return IliasElementType.EXERCISE
if "webr" in icon["class"]:
@ -908,8 +764,6 @@ class IliasPage:
return IliasElementType.SURVEY
if "file" in icon["class"]:
return IliasElementType.FILE
if "mcst" in icon["class"]:
return IliasElementType.MEDIACAST_VIDEO_FOLDER
_unexpected_html_warning()
log.warn_contd(f"Could not extract type from {icon} for card title {card_title}")
@ -948,15 +802,6 @@ class IliasPage:
if "cmdClass=ilobjtestgui" in parsed_url.query:
return IliasElementType.TEST
if "baseClass=ilLMPresentationGUI" in parsed_url.query:
return IliasElementType.LEARNING_MODULE
if "baseClass=ilMediaCastHandlerGUI" in parsed_url.query:
return IliasElementType.MEDIACAST_VIDEO_FOLDER
if "baseClass=ilSAHSPresentationGUI" in parsed_url.query:
return IliasElementType.SCORM_LEARNING_MODULE
# Booking and Meeting can not be detected based on the link. They do have a ref_id though, so
# try to guess it from the image.
@ -998,11 +843,7 @@ class IliasPage:
if img_tag is None:
img_tag = found_parent.select_one("img.icon")
is_session_expansion_button = found_parent.find(
"a",
attrs={"href": lambda x: x and ("crs_next_sess=" in x or "crs_prev_sess=" in x)}
)
if img_tag is None and is_session_expansion_button:
if img_tag is None and found_parent.find("a", attrs={"href": lambda x: x and "crs_next_sess=" in x}):
log.explain("Found session expansion button, skipping it as it has no content")
return None
@ -1012,7 +853,7 @@ class IliasPage:
return None
if "opencast" in str(img_tag["alt"]).lower():
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED
if str(img_tag["src"]).endswith("icon_exc.svg"):
return IliasElementType.EXERCISE
@ -1032,12 +873,6 @@ class IliasPage:
if str(img_tag["src"]).endswith("icon_tst.svg"):
return IliasElementType.TEST
if str(img_tag["src"]).endswith("icon_mcst.svg"):
return IliasElementType.MEDIACAST_VIDEO_FOLDER
if str(img_tag["src"]).endswith("icon_sahs.svg"):
return IliasElementType.SCORM_LEARNING_MODULE
return IliasElementType.FOLDER
@staticmethod
@ -1067,34 +902,6 @@ class IliasPage:
rest_of_name = split_delimiter.join(meeting_name.split(split_delimiter)[1:])
return datetime.strftime(date_portion, "%Y-%m-%d") + split_delimiter + rest_of_name
@staticmethod
def is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False
def _abs_url_from_link(self, link_tag: Tag) -> str:
"""
Create an absolute url from an <a> tag.

View File

@ -1,11 +1,8 @@
import asyncio
import base64
import os
import re
from collections.abc import Awaitable, Coroutine
from pathlib import PurePath
from typing import Any, Callable, Dict, List, Literal, Optional, Set, Union, cast
from urllib.parse import urljoin
from typing import Any, Callable, Dict, List, Optional, Set, Union, cast
import aiohttp
import yarl
@ -19,10 +16,10 @@ from ...output_dir import FileSink, Redownload
from ...utils import fmt_path, soupify, url_set_query_param
from ..crawler import AWrapped, CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical
from ..http_crawler import HttpCrawler, HttpCrawlerSection
from .file_templates import Links, learning_module_template
from .file_templates import Links
from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
IliasPageElement, _sanitize_path_name, parse_ilias_forum_export)
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasPage, IliasPageElement,
_sanitize_path_name, parse_ilias_forum_export)
TargetType = Union[str, int]
@ -81,25 +78,21 @@ class KitIliasWebCrawlerSection(HttpCrawlerSection):
return self.s.getboolean("forums", fallback=False)
_DIRECTORY_PAGES: Set[IliasElementType] = {
_DIRECTORY_PAGES: Set[IliasElementType] = set([
IliasElementType.EXERCISE,
IliasElementType.EXERCISE_FILES,
IliasElementType.FOLDER,
IliasElementType.INFO_TAB,
IliasElementType.MEETING,
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
}
IliasElementType.VIDEO_FOLDER,
IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED,
])
_VIDEO_ELEMENTS: Set[IliasElementType] = {
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.MEDIACAST_VIDEO,
IliasElementType.OPENCAST_VIDEO,
IliasElementType.OPENCAST_VIDEO_PLAYER,
IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
}
_VIDEO_ELEMENTS: Set[IliasElementType] = set([
IliasElementType.VIDEO,
IliasElementType.VIDEO_PLAYER,
IliasElementType.VIDEO_FOLDER,
IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED,
])
def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]:
@ -133,17 +126,6 @@ def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Calla
return decorator
def _wrap_io_in_warning(name: str) -> Callable[[AWrapped], AWrapped]:
"""
Wraps any I/O exception in a CrawlWarning.
"""
return _iorepeat(1, name)
def _get_video_cache_key(element: IliasPageElement) -> str:
return f"ilias-video-cache-{element.id()}"
# Crawler control flow:
#
# crawl_desktop -+
@ -237,116 +219,45 @@ instance's greatest bottleneck.
return
cl = maybe_cl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
description: List[BeautifulSoup] = []
def ensure_is_valid_course_id(parent: Optional[IliasPageElement], soup: BeautifulSoup) -> None:
if parent is None and expected_id is not None:
perma_link_element: Tag = soup.find(id="current_perma_link")
if not perma_link_element or "crs_" not in perma_link_element.get("value"):
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
@_iorepeat(3, "crawling url")
async def gather_elements() -> None:
elements.clear()
async with cl:
next_stage_url: Optional[str] = url
current_parent = None
# Duplicated code, but the root page is special - we want to avoid fetching it twice!
while next_stage_url:
soup = await self._get_page(next_stage_url, root_page_allowed=True)
if current_parent is None and expected_id is not None:
perma_link_element: Tag = soup.find(id="current_perma_link")
if not perma_link_element or "crs_" not in perma_link_element.get("value"):
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
next_stage_url = next_element.url
else:
next_stage_url = None
elements.extend(page.get_child_elements())
if info_tab := page.get_info_tab():
elements.append(info_tab)
if description_string := page.get_description():
description.append(description_string)
# Fill up our task list with the found elements
await gather_elements()
if description:
await self._download_description(PurePath("."), description[0])
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(PurePath("."), element):
tasks.append(asyncio.create_task(handle))
# And execute them
await self.gather(tasks)
async def _handle_ilias_page(
self,
url: str,
parent: IliasPageElement,
path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(path)
if not maybe_cl:
return None
return self._crawl_ilias_page(url, parent, maybe_cl)
await self._crawl_ilias_page(url, None, cl, ensure_is_valid_course_id)
@anoncritical
async def _crawl_ilias_page(
self,
url: str,
parent: IliasPageElement,
parent: Optional[IliasPageElement],
cl: CrawlToken,
next_stage_hook: Callable[[Optional[IliasPageElement], BeautifulSoup], None] = lambda a, b: None
) -> None:
elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
description: List[BeautifulSoup] = []
async with cl:
next_stage_url: Optional[str] = url
current_parent = parent
@_iorepeat(3, "crawling folder")
async def gather_elements() -> None:
elements.clear()
async with cl:
next_stage_url: Optional[str] = url
current_parent = parent
while next_stage_url:
soup = await self._get_page(next_stage_url)
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
while next_stage_url:
soup = await self._get_page(next_stage_url)
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
next_stage_url = next_element.url
else:
next_stage_url = None
next_stage_hook(current_parent, soup)
elements.extend(page.get_child_elements())
if description_string := page.get_description():
description.append(description_string)
page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
next_stage_url = next_element.url
else:
next_stage_url = None
# Fill up our task list with the found elements
await gather_elements()
for element in sorted(page.get_child_elements(), key=lambda e: e.id()):
await self._handle_ilias_element(cl.path, element)
if description:
await self._download_description(cl.path, description[0])
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(cl.path, element):
tasks.append(asyncio.create_task(handle))
# And execute them
await self.gather(tasks)
if description_string := page.get_description():
await self._download_description(cl.path, description_string)
# These decorators only apply *to this method* and *NOT* to the returned
# awaitables!
@ -358,7 +269,7 @@ instance's greatest bottleneck.
self,
parent_path: PurePath,
element: IliasPageElement,
) -> Optional[Coroutine[Any, Any, None]]:
) -> None:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. "
@ -380,7 +291,7 @@ instance's greatest bottleneck.
return None
if element.type == IliasElementType.FILE:
return await self._handle_file(element, element_path)
await self._handle_file(element, element_path)
elif element.type == IliasElementType.FORUM:
if not self._forums:
log.status(
@ -390,7 +301,7 @@ instance's greatest bottleneck.
"[bright_black](enable with option 'forums')"
)
return None
return await self._handle_forum(element, element_path)
await self._handle_forum(element, element_path)
elif element.type == IliasElementType.TEST:
log.status(
"[bold bright_black]",
@ -407,28 +318,19 @@ instance's greatest bottleneck.
"[bright_black](surveys contain no relevant data)"
)
return None
elif element.type == IliasElementType.SCORM_LEARNING_MODULE:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](scorm learning modules are not supported)"
)
return None
elif element.type == IliasElementType.LEARNING_MODULE:
return await self._handle_learning_module(element, element_path)
elif element.type == IliasElementType.LINK:
return await self._handle_link(element, element_path)
await self._handle_link(element, element_path)
elif element.type == IliasElementType.BOOKING:
return await self._handle_booking(element, element_path)
elif element.type == IliasElementType.OPENCAST_VIDEO:
return await self._handle_file(element, element_path)
elif element.type == IliasElementType.OPENCAST_VIDEO_PLAYER:
return await self._handle_opencast_video(element, element_path)
elif element.type == IliasElementType.MEDIACAST_VIDEO:
return await self._handle_file(element, element_path)
await self._handle_booking(element, element_path)
elif element.type == IliasElementType.VIDEO:
await self._handle_file(element, element_path)
elif element.type == IliasElementType.VIDEO_PLAYER:
await self._handle_video(element, element_path)
elif element.type in _DIRECTORY_PAGES:
return await self._handle_ilias_page(element.url, element, element_path)
maybe_cl = await self.crawl(element_path)
if not maybe_cl:
return None
await self._crawl_ilias_page(element.url, element, maybe_cl)
else:
# This will retry it a few times, failing everytime. It doesn't make any network
# requests, so that's fine.
@ -438,7 +340,7 @@ instance's greatest bottleneck.
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
) -> None:
log.explain_topic(f"Decision: Crawl Link {fmt_path(element_path)}")
log.explain(f"Links type is {self._links}")
@ -455,7 +357,7 @@ instance's greatest bottleneck.
if not maybe_dl:
return None
return self._download_link(element, link_template_maybe, maybe_dl)
await self._download_link(element, link_template_maybe, maybe_dl)
@anoncritical
@_iorepeat(3, "resolving link")
@ -543,16 +445,16 @@ instance's greatest bottleneck.
raise CrawlError("resolve_link_target failed even after authenticating")
async def _handle_opencast_video(
async def _handle_video(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
) -> None:
# Copy old mapping as it is likely still relevant
if self.prev_report:
self.report.add_custom_value(
_get_video_cache_key(element),
self.prev_report.get_custom_value(_get_video_cache_key(element))
str(element_path),
self.prev_report.get_custom_value(str(element_path))
)
# A video might contain other videos, so let's "crawl" the video first
@ -562,69 +464,58 @@ instance's greatest bottleneck.
# to ensure backwards compatibility.
maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.ALWAYS)
# If we do not want to crawl it (user filter), we can move on
if not maybe_dl:
return None
# If we have every file from the cached mapping already, we can ignore this and bail
if self._all_opencast_videos_locally_present(element, maybe_dl.path):
# Mark all existing videos as known to ensure they do not get deleted during cleanup.
# We "downloaded" them, just without actually making a network request as we assumed
# they did not change.
contained = self._previous_contained_opencast_videos(element, maybe_dl.path)
if len(contained) > 1:
# Only do this if we threw away the original dl token,
# to not download single-stream videos twice
for video in contained:
await self.download(video)
# If we do not want to crawl it (user filter) or we have every file
# from the cached mapping already, we can ignore this and bail
if not maybe_dl or self._all_videos_locally_present(element_path):
# Mark all existing cideos as known so they do not get deleted
# during dleanup. We "downloaded" them, just without actually making
# a network request as we assumed they did not change.
for video in self._previous_contained_videos(element_path):
await self.download(video)
return None
return self._download_opencast_video(element, maybe_dl)
await self._download_video(element_path, element, maybe_dl)
def _previous_contained_opencast_videos(
self, element: IliasPageElement, element_path: PurePath
) -> List[PurePath]:
def _previous_contained_videos(self, video_path: PurePath) -> List[PurePath]:
if not self.prev_report:
return []
custom_value = self.prev_report.get_custom_value(_get_video_cache_key(element))
custom_value = self.prev_report.get_custom_value(str(video_path))
if not custom_value:
return []
cached_value = cast(dict[str, Any], custom_value)
if "known_paths" not in cached_value or "own_path" not in cached_value:
log.explain(f"'known_paths' or 'own_path' missing from cached value: {cached_value}")
return []
transformed_own_path = self._transformer.transform(element_path)
if cached_value["own_path"] != str(transformed_own_path):
log.explain(
f"own_path '{transformed_own_path}' does not match cached value: '{cached_value['own_path']}"
)
return []
return [PurePath(name) for name in cached_value["known_paths"]]
names = cast(List[str], custom_value)
folder = video_path.parent
return [PurePath(folder, name) for name in names]
def _all_opencast_videos_locally_present(self, element: IliasPageElement, element_path: PurePath) -> bool:
log.explain_topic(f"Checking local cache for video {fmt_path(element_path)}")
if contained_videos := self._previous_contained_opencast_videos(element, element_path):
log.explain(
f"The following contained videos are known: {','.join(map(fmt_path, contained_videos))}"
)
if all(self._output_dir.resolve(path).exists() for path in contained_videos):
log.explain("Found all known videos locally, skipping enumeration request")
def _all_videos_locally_present(self, video_path: PurePath) -> bool:
if contained_videos := self._previous_contained_videos(video_path):
log.explain_topic(f"Checking local cache for video {video_path.name}")
all_found_locally = True
for video in contained_videos:
transformed_path = self._to_local_video_path(video)
if transformed_path:
exists_locally = self._output_dir.resolve(transformed_path).exists()
all_found_locally = all_found_locally and exists_locally
if all_found_locally:
log.explain("Found all videos locally, skipping enumeration request")
return True
log.explain("Missing at least one video, continuing with requests!")
else:
log.explain("No local cache present")
return False
def _to_local_video_path(self, path: PurePath) -> Optional[PurePath]:
if transformed := self._transformer.transform(path):
return self._deduplicator.fixup_path(transformed)
return None
@anoncritical
@_iorepeat(3, "downloading video")
async def _download_opencast_video(self, element: IliasPageElement, dl: DownloadToken) -> None:
def add_to_report(paths: list[str]) -> None:
self.report.add_custom_value(
_get_video_cache_key(element),
{"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))}
)
async def _download_video(
self,
original_path: PurePath,
element: IliasPageElement,
dl: DownloadToken
) -> None:
stream_elements: List[IliasPageElement] = []
async with dl as (bar, sink):
page = IliasPage(await self._get_page(element.url), element.url, element)
stream_elements = page.get_child_elements()
@ -635,38 +526,45 @@ instance's greatest bottleneck.
log.explain(f"Using single video mode for {element.name}")
stream_element = stream_elements[0]
transformed_path = self._to_local_video_path(original_path)
if not transformed_path:
raise CrawlError(f"Download returned a path but transform did not for {original_path}")
# We do not have a local cache yet
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
add_to_report([str(self._transformer.transform(dl.path))])
if self._output_dir.resolve(transformed_path).exists():
log.explain(f"Video for {element.name} existed locally")
else:
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
self.report.add_custom_value(str(original_path), [original_path.name])
return
contained_video_paths: List[str] = []
for stream_element in stream_elements:
video_path = dl.path.parent / stream_element.name
video_path = original_path.parent / stream_element.name
contained_video_paths.append(str(video_path))
maybe_dl = await self.download(video_path, mtime=element.mtime, redownload=Redownload.NEVER)
if not maybe_dl:
continue
async with maybe_dl as (bar, sink):
log.explain(f"Streaming video from real url {stream_element.url}")
contained_video_paths.append(str(self._transformer.transform(maybe_dl.path)))
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
add_to_report(contained_video_paths)
self.report.add_custom_value(str(original_path), contained_video_paths)
async def _handle_file(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
) -> None:
maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl:
return None
return self._download_file(element, maybe_dl)
await self._download_file(element, maybe_dl)
@_iorepeat(3, "downloading file")
@anoncritical
@_iorepeat(3, "downloading file")
async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None:
assert dl # The function is only reached when dl is not None
async with dl as (bar, sink):
@ -706,11 +604,11 @@ instance's greatest bottleneck.
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
) -> None:
maybe_cl = await self.crawl(element_path)
if not maybe_cl:
return None
return self._crawl_forum(element, maybe_cl)
await self._crawl_forum(element, maybe_cl)
@_iorepeat(3, "crawling forum")
@anoncritical
@ -724,7 +622,7 @@ instance's greatest bottleneck.
log.explain(f"URL: {next_stage_url}")
soup = await self._get_page(next_stage_url)
page = IliasPage(soup, next_stage_url, element)
page = IliasPage(soup, next_stage_url, None)
if next := page.get_next_stage_element():
next_stage_url = next.url
@ -736,6 +634,7 @@ instance's greatest bottleneck.
raise CrawlWarning("Failed to extract forum data")
if download_data.empty:
log.explain("Forum had no threads")
elements = []
return
html = await self._post_authenticated(download_data.url, download_data.form_data)
elements = parse_ilias_forum_export(soupify(html))
@ -767,142 +666,12 @@ instance's greatest bottleneck.
sink.file.write(content.encode("utf-8"))
sink.done()
async def _handle_learning_module(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(element_path)
if not maybe_cl:
return None
return self._crawl_learning_module(element, maybe_cl)
@_iorepeat(3, "crawling learning module")
@anoncritical
async def _crawl_learning_module(self, element: IliasPageElement, cl: CrawlToken) -> None:
elements: List[IliasLearningModulePage] = []
async with cl:
log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {element.url}")
soup = await self._get_page(element.url)
page = IliasPage(soup, element.url, element)
if next := page.get_learning_module_data():
elements.extend(await self._crawl_learning_module_direction(
cl.path, next.previous_url, "left", element
))
elements.append(next)
elements.extend(await self._crawl_learning_module_direction(
cl.path, next.next_url, "right", element
))
# Reflect their natural ordering in the file names
for index, lm_element in enumerate(elements):
lm_element.title = f"{index:02}_{lm_element.title}"
tasks: List[Awaitable[None]] = []
for index, elem in enumerate(elements):
prev_url = elements[index - 1].title if index > 0 else None
next_url = elements[index + 1].title if index < len(elements) - 1 else None
tasks.append(asyncio.create_task(
self._download_learning_module_page(cl.path, elem, prev_url, next_url)
))
# And execute them
await self.gather(tasks)
async def _crawl_learning_module_direction(
self,
path: PurePath,
start_url: Optional[str],
dir: Union[Literal["left"], Literal["right"]],
parent_element: IliasPageElement
) -> List[IliasLearningModulePage]:
elements: List[IliasLearningModulePage] = []
if not start_url:
return elements
next_element_url: Optional[str] = start_url
counter = 0
while next_element_url:
log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})")
log.explain(f"URL: {next_element_url}")
soup = await self._get_page(next_element_url)
page = IliasPage(soup, next_element_url, parent_element)
if next := page.get_learning_module_data():
elements.append(next)
if dir == "left":
next_element_url = next.previous_url
else:
next_element_url = next.next_url
counter += 1
return elements
@anoncritical
@_iorepeat(3, "saving learning module page")
async def _download_learning_module_page(
self,
parent_path: PurePath,
element: IliasLearningModulePage,
prev: Optional[str],
next: Optional[str]
) -> None:
path = parent_path / (_sanitize_path_name(element.title) + ".html")
maybe_dl = await self.download(path)
if not maybe_dl:
return
my_path = self._transformer.transform(maybe_dl.path)
if not my_path:
return
if prev:
prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html"))
if prev_p:
prev = os.path.relpath(prev_p, my_path.parent)
else:
prev = None
if next:
next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html"))
if next_p:
next = os.path.relpath(next_p, my_path.parent)
else:
next = None
async with maybe_dl as (bar, sink):
content = element.content
content = await self.internalize_images(content)
sink.file.write(learning_module_template(content, maybe_dl.path.name, prev, next).encode("utf-8"))
sink.done()
async def internalize_images(self, tag: Tag) -> Tag:
"""
Tries to fetch ILIAS images and embed them as base64 data.
"""
log.explain_topic("Internalizing images")
for elem in tag.find_all(recursive=True):
if not isinstance(elem, Tag):
continue
if elem.name == "img":
if src := elem.attrs.get("src", None):
url = urljoin(_ILIAS_URL, src)
if not url.startswith(_ILIAS_URL):
continue
log.explain(f"Internalizing {url!r}")
img = await self._get_authenticated(url)
elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode()
if elem.name == "iframe" and elem.attrs.get("src", "").startswith("//"):
# For unknown reasons the protocol seems to be stripped.
elem.attrs["src"] = "https:" + elem.attrs["src"]
return tag
async def _get_page(self, url: str, root_page_allowed: bool = False) -> BeautifulSoup:
async def _get_page(self, url: str) -> BeautifulSoup:
auth_id = await self._current_auth_id()
async with self.session.get(url) as request:
soup = soupify(await request.read())
if IliasPage.is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed)
if self._is_logged_in(soup):
return soup
# We weren't authenticated, so try to do that
await self.authenticate(auth_id)
@ -910,28 +679,15 @@ instance's greatest bottleneck.
# Retry once after authenticating. If this fails, we will die.
async with self.session.get(url) as request:
soup = soupify(await request.read())
if IliasPage.is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed)
raise CrawlError(f"get_page failed even after authenticating on {url!r}")
@staticmethod
def _verify_page(soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup:
if IliasPage.is_root_page(soup) and not root_page_allowed:
raise CrawlError(
"Unexpectedly encountered ILIAS root page. "
"This usually happens because the ILIAS instance is broken. "
"If so, wait a day or two and try again. "
"It could also happen because a crawled element links to the ILIAS root page. "
"If so, use a transform with a ! as target to ignore the particular element. "
f"The redirect came from {url}"
)
return soup
if self._is_logged_in(soup):
return soup
raise CrawlError("get_page failed even after authenticating")
async def _post_authenticated(
self,
url: str,
data: dict[str, Union[str, List[str]]]
) -> bytes:
) -> BeautifulSoup:
auth_id = await self._current_auth_id()
form_data = aiohttp.FormData()
@ -951,28 +707,40 @@ instance's greatest bottleneck.
return await request.read()
raise CrawlError("post_authenticated failed even after authenticating")
async def _get_authenticated(self, url: str) -> bytes:
auth_id = await self._current_auth_id()
async with self.session.get(url, allow_redirects=False) as request:
if request.status == 200:
return await request.read()
# We weren't authenticated, so try to do that
await self.authenticate(auth_id)
# Retry once after authenticating. If this fails, we will die.
async with self.session.get(url, allow_redirects=False) as request:
if request.status == 200:
return await request.read()
raise CrawlError("get_authenticated failed even after authenticating")
# We repeat this as the login method in shibboleth doesn't handle I/O errors.
# Shibboleth is quite reliable as well, the repeat is likely not critical here.
@_iorepeat(3, "Login", failure_is_error=True)
@ _iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session)
@ staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False
class KitShibbolethLogin:
"""
@ -1119,7 +887,7 @@ async def _shib_post(
async with session.get(correct_url, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Redirected to {location!r} with status {response.status}")
# If shib still has a valid session, it will directly respond to the request
# If shib still still has a valid session, it will directly respond to the request
if location is None:
log.explain("Shib recognized us, returning its response directly")
return soupify(await response.read())

View File

@ -2,7 +2,7 @@ import os
import re
from dataclasses import dataclass
from pathlib import PurePath
from typing import Awaitable, List, Optional, Pattern, Set, Tuple, Union
from typing import List, Optional, Pattern, Set, Tuple, Union
from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag
@ -64,42 +64,37 @@ class KitIpdCrawler(HttpCrawler):
self._file_regex = section.link_regex()
async def _run(self) -> None:
maybe_cl = await self.crawl(PurePath("."))
if not maybe_cl:
cl = await self.crawl(PurePath("."))
if not cl:
return
tasks: List[Awaitable[None]] = []
async with maybe_cl:
async with cl:
for item in await self._fetch_items():
if isinstance(item, KitIpdFolder):
tasks.append(self._crawl_folder(item))
await self._crawl_folder(item)
else:
# Orphan files are placed in the root folder
tasks.append(self._download_file(PurePath("."), item))
await self.gather(tasks)
await self._download_file(PurePath("."), item)
async def _crawl_folder(self, folder: KitIpdFolder) -> None:
path = PurePath(folder.name)
if not await self.crawl(path):
return
tasks = [self._download_file(path, file) for file in folder.files]
await self.gather(tasks)
for file in folder.files:
await self._download_file(path, file)
async def _download_file(self, parent: PurePath, file: KitIpdFile) -> None:
element_path = parent / file.name
maybe_dl = await self.download(element_path)
if not maybe_dl:
dl = await self.download(element_path)
if not dl:
return
async with maybe_dl as (bar, sink):
async with dl as (bar, sink):
await self._stream_from_url(file.url, sink, bar)
async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]:
page, url = await self.get_page()
page, url = await self._get_page()
elements: List[Tag] = self._find_file_links(page)
items: Set[Union[KitIpdFile, KitIpdFolder]] = set()
@ -159,12 +154,12 @@ class KitIpdCrawler(HttpCrawler):
sink.done()
async def get_page(self) -> Tuple[BeautifulSoup, str]:
async with self.session.get(self._url) as request:
# The web page for Algorithmen für Routenplanung contains some
# weird comments that beautifulsoup doesn't parse correctly. This
# hack enables those pages to be crawled, and should hopefully not
# cause issues on other pages.
content = (await request.read()).decode("utf-8")
content = re.sub(r"<!--.*?-->", "", content)
return soupify(content.encode("utf-8")), str(request.url)
async def _get_page(self) -> Tuple[BeautifulSoup, str]:
response = self.session.get(self._url)
# The web page for Algorithmen für Routenplanung contains some
# weird comments that beautifulsoup doesn't parse correctly. This
# hack enables those pages to be crawled, and should hopefully not
# cause issues on other pages.
content = re.sub(r"<!--.*?-->", "", response.text)
return soupify(content.encode("utf-8")), str(request.url)

View File

@ -71,8 +71,6 @@ class LocalCrawler(Crawler):
if not cl:
return
tasks = []
async with cl:
await asyncio.sleep(random.uniform(
0.5 * self._crawl_delay,
@ -81,9 +79,7 @@ class LocalCrawler(Crawler):
for child in path.iterdir():
pure_child = cl.path / child.name
tasks.append(self._crawl_path(child, pure_child))
await self.gather(tasks)
await self._crawl_path(child, pure_child)
async def _crawl_file(self, path: Path, pure: PurePath) -> None:
stat = path.stat()

View File

@ -14,7 +14,7 @@ def name_variants(path: PurePath) -> Iterator[PurePath]:
class Deduplicator:
FORBIDDEN_CHARS = '<>:"/\\|?*' + "".join([chr(i) for i in range(0, 32)])
FORBIDDEN_CHARS = '<>:"/\\|?*'
FORBIDDEN_NAMES = {
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",

View File

@ -1,97 +0,0 @@
import asyncio
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncIterator, Optional
@dataclass
class Slot:
active: bool = False
last_left: Optional[float] = None
class Limiter:
def __init__(
self,
task_limit: int,
download_limit: int,
task_delay: float
):
if task_limit <= 0:
raise ValueError("task limit must be at least 1")
if download_limit <= 0:
raise ValueError("download limit must be at least 1")
if download_limit > task_limit:
raise ValueError("download limit can't be greater than task limit")
if task_delay < 0:
raise ValueError("Task delay must not be negative")
self._slots = [Slot() for _ in range(task_limit)]
self._downloads = download_limit
self._delay = task_delay
self._condition = asyncio.Condition()
def _acquire_slot(self) -> Optional[Slot]:
for slot in self._slots:
if not slot.active:
slot.active = True
return slot
return None
async def _wait_for_slot_delay(self, slot: Slot) -> None:
if slot.last_left is not None:
delay = slot.last_left + self._delay - time.time()
if delay > 0:
await asyncio.sleep(delay)
def _release_slot(self, slot: Slot) -> None:
slot.last_left = time.time()
slot.active = False
@asynccontextmanager
async def limit_crawl(self) -> AsyncIterator[None]:
slot: Slot
async with self._condition:
while True:
if found_slot := self._acquire_slot():
slot = found_slot
break
await self._condition.wait()
await self._wait_for_slot_delay(slot)
try:
yield
finally:
async with self._condition:
self._release_slot(slot)
self._condition.notify_all()
@asynccontextmanager
async def limit_download(self) -> AsyncIterator[None]:
slot: Slot
async with self._condition:
while True:
if self._downloads <= 0:
await self._condition.wait()
continue
if found_slot := self._acquire_slot():
slot = found_slot
self._downloads -= 1
break
await self._condition.wait()
await self._wait_for_slot_delay(slot)
try:
yield
finally:
async with self._condition:
self._release_slot(slot)
self._downloads += 1
self._condition.notify_all()

View File

@ -59,7 +59,6 @@ class Log:
# Whether different parts of the output are enabled or disabled
self.output_explain = False
self.output_status = True
self.output_not_deleted = True
self.output_report = True
def _update_live(self) -> None:
@ -208,17 +207,6 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
action = escape(f"{action:<{self.STATUS_WIDTH}}")
self.print(f"{style}{action}[/] {escape(text)} {suffix}")
def not_deleted(self, style: str, action: str, text: str, suffix: str = "") -> None:
"""
Print a message for a local only file that wasn't
deleted while crawling. Allows markup in the "style"
argument which will be applied to the "action" string.
"""
if self.output_status and self.output_not_deleted:
action = escape(f"{action:<{self.STATUS_WIDTH}}")
self.print(f"{style}{action}[/] {escape(text)} {suffix}")
def report(self, text: str) -> None:
"""
Print a report after crawling. Allows markup.
@ -227,14 +215,6 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
if self.output_report:
self.print(text)
def report_not_deleted(self, text: str) -> None:
"""
Print a report for a local only file that wasn't deleted after crawling. Allows markup.
"""
if self.output_report and self.output_not_deleted:
self.print(text)
@contextmanager
def _bar(
self,

View File

@ -44,7 +44,6 @@ class OnConflict(Enum):
LOCAL_FIRST = "local-first"
REMOTE_FIRST = "remote-first"
NO_DELETE = "no-delete"
NO_DELETE_PROMPT_OVERWRITE = "no-delete-prompt-overwrite"
@staticmethod
def from_string(string: str) -> "OnConflict":
@ -52,7 +51,7 @@ class OnConflict(Enum):
return OnConflict(string)
except ValueError:
raise ValueError("must be one of 'prompt', 'local-first',"
" 'remote-first', 'no-delete', 'no-delete-prompt-overwrite'")
" 'remote-first', 'no-delete'")
@dataclass
@ -265,7 +264,7 @@ class OutputDirectory:
on_conflict: OnConflict,
path: PurePath,
) -> bool:
if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
if on_conflict == OnConflict.PROMPT:
async with log.exclusive_output():
prompt = f"Replace {fmt_path(path)} with remote file?"
return await prompt_yes_no(prompt, default=False)
@ -284,7 +283,7 @@ class OutputDirectory:
on_conflict: OnConflict,
path: PurePath,
) -> bool:
if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
if on_conflict == OnConflict.PROMPT:
async with log.exclusive_output():
prompt = f"Recursively delete {fmt_path(path)} and replace with remote file?"
return await prompt_yes_no(prompt, default=False)
@ -304,7 +303,7 @@ class OutputDirectory:
path: PurePath,
parent: PurePath,
) -> bool:
if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
if on_conflict == OnConflict.PROMPT:
async with log.exclusive_output():
prompt = f"Delete {fmt_path(parent)} so remote file {fmt_path(path)} can be downloaded?"
return await prompt_yes_no(prompt, default=False)
@ -331,7 +330,7 @@ class OutputDirectory:
return False
elif on_conflict == OnConflict.REMOTE_FIRST:
return True
elif on_conflict in {OnConflict.NO_DELETE, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
elif on_conflict == OnConflict.NO_DELETE:
return False
# This should never be reached
@ -496,7 +495,7 @@ class OutputDirectory:
except OSError:
pass
else:
log.not_deleted("[bold bright_magenta]", "Not deleted", fmt_path(pure))
log.status("[bold bright_magenta]", "Not deleted", fmt_path(pure))
self._report.not_delete_file(pure)
def load_prev_report(self) -> None:

View File

@ -180,7 +180,7 @@ class Pferd:
log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}")
for path in sorted(crawler.report.not_deleted_files):
something_changed = True
log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
log.report(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
for warning in crawler.report.encountered_warnings:
something_changed = True

View File

@ -92,17 +92,32 @@ def url_set_query_params(url: str, params: Dict[str, str]) -> str:
def str_path(path: PurePath) -> str:
"""
Turn a path into a string, in a platform-independent way.
This function always uses "/" as path separator, even on Windows.
"""
if not path.parts:
return "."
return "/".join(path.parts)
def fmt_path(path: PurePath) -> str:
"""
Turn a path into a delimited string.
This is useful if file or directory names contain weird characters like
newlines, leading/trailing whitespace or unprintable characters. This way,
they are escaped and visible to the user.
"""
return repr(str_path(path))
def fmt_real_path(path: Path) -> str:
return repr(str(path.absolute()))
"""
Like fmt_path, but resolves the path before converting it to a string.
"""
return fmt_path(path.absolute())
class ReusableAsyncContextManager(ABC, Generic[T]):

View File

@ -1,2 +1,2 @@
NAME = "PFERD"
VERSION = "3.5.0"
VERSION = "3.4.3"

27
flake.lock generated
View File

@ -1,27 +0,0 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1694499547,
"narHash": "sha256-R7xMz1Iia6JthWRHDn36s/E248WB1/je62ovC/dUVKI=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "e5f018cf150e29aac26c61dac0790ea023c46b24",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.05",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

View File

@ -1,41 +0,0 @@
{
description = "Tool for downloading course-related files from ILIAS";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05";
};
outputs = { self, nixpkgs }:
let
# Helper function to generate an attrset '{ x86_64-linux = f "x86_64-linux"; ... }'.
forAllSystems = nixpkgs.lib.genAttrs nixpkgs.lib.systems.flakeExposed;
in
{
packages = forAllSystems (system:
let pkgs = import nixpkgs { inherit system; };
in
rec {
default = pkgs.python3Packages.buildPythonApplication rec {
pname = "pferd";
# Performing black magic
# Don't worry, I sacrificed enough goats for the next few years
version = (pkgs.lib.importTOML ./PFERD/version.py).VERSION;
format = "pyproject";
src = ./.;
nativeBuildInputs = with pkgs.python3Packages; [
setuptools
];
propagatedBuildInputs = with pkgs.python3Packages; [
aiohttp
beautifulsoup4
rich
keyring
certifi
];
};
});
};
}

View File

@ -14,4 +14,4 @@ pip install --editable .
# Installing tools and type hints
pip install --upgrade mypy flake8 autopep8 isort pyinstaller
pip install --upgrade types-chardet types-certifi
mypy PFERD --install-types --non-interactive

View File

@ -11,6 +11,7 @@ install_requires =
rich>=11.0.0
keyring>=23.5.0
certifi>=2021.10.8
requests>=2.28.1
[options.entry_points]
console_scripts =