diff --git a/PFERD/auth/__init__.py b/PFERD/auth/__init__.py index 80d4586..7295c7a 100644 --- a/PFERD/auth/__init__.py +++ b/PFERD/auth/__init__.py @@ -1,5 +1,5 @@ +from collections.abc import Callable from configparser import SectionProxy -from typing import Callable, Dict from ..config import Config from .authenticator import Authenticator, AuthError, AuthLoadError, AuthSection # noqa: F401 @@ -18,7 +18,7 @@ AuthConstructor = Callable[ Authenticator, ] -AUTHENTICATORS: Dict[str, AuthConstructor] = { +AUTHENTICATORS: dict[str, AuthConstructor] = { "credential-file": lambda n, s, c: CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c), "keyring": lambda n, s, c: KeyringAuthenticator(n, KeyringAuthSection(s)), "pass": lambda n, s, c: PassAuthenticator(n, PassAuthSection(s)), diff --git a/PFERD/auth/authenticator.py b/PFERD/auth/authenticator.py index 643a2d5..417b7ba 100644 --- a/PFERD/auth/authenticator.py +++ b/PFERD/auth/authenticator.py @@ -1,5 +1,4 @@ from abc import ABC, abstractmethod -from typing import Tuple from ..config import Section @@ -35,7 +34,7 @@ class Authenticator(ABC): self.name = name @abstractmethod - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: pass async def username(self) -> str: diff --git a/PFERD/auth/credential_file.py b/PFERD/auth/credential_file.py index 94ffa73..cb7834c 100644 --- a/PFERD/auth/credential_file.py +++ b/PFERD/auth/credential_file.py @@ -1,5 +1,4 @@ from pathlib import Path -from typing import Tuple from ..config import Config from ..utils import fmt_real_path @@ -23,7 +22,9 @@ class CredentialFileAuthenticator(Authenticator): with open(path, encoding="utf-8") as f: lines = list(f) except UnicodeDecodeError: - raise AuthLoadError(f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8") + raise AuthLoadError( + f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8" + ) from None except OSError as e: raise AuthLoadError(f"No credential file at {fmt_real_path(path)}") from e @@ -42,5 +43,5 @@ class CredentialFileAuthenticator(Authenticator): self._username = uline[9:] self._password = pline[9:] - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: return self._username, self._password diff --git a/PFERD/auth/keyring.py b/PFERD/auth/keyring.py index 7ff2673..e69a69e 100644 --- a/PFERD/auth/keyring.py +++ b/PFERD/auth/keyring.py @@ -1,4 +1,4 @@ -from typing import Optional, Tuple, cast +from typing import Optional, cast import keyring @@ -27,7 +27,7 @@ class KeyringAuthenticator(Authenticator): self._password_invalidated = False self._username_fixed = section.username() is not None - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: # Request the username if self._username is None: async with log.exclusive_output(): diff --git a/PFERD/auth/pass_.py b/PFERD/auth/pass_.py index 4c8e775..c5d9b24 100644 --- a/PFERD/auth/pass_.py +++ b/PFERD/auth/pass_.py @@ -1,6 +1,5 @@ import re import subprocess -from typing import List, Tuple from ..logging import log from .authenticator import Authenticator, AuthError, AuthSection @@ -12,11 +11,11 @@ class PassAuthSection(AuthSection): self.missing_value("passname") return value - def username_prefixes(self) -> List[str]: + def username_prefixes(self) -> list[str]: value = self.s.get("username_prefixes", "login,username,user") return [prefix.lower() for prefix in value.split(",")] - def password_prefixes(self) -> List[str]: + def password_prefixes(self) -> list[str]: value = self.s.get("password_prefixes", "password,pass,secret") return [prefix.lower() for prefix in value.split(",")] @@ -31,14 +30,14 @@ class PassAuthenticator(Authenticator): self._username_prefixes = section.username_prefixes() self._password_prefixes = section.password_prefixes() - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: log.explain_topic("Obtaining credentials from pass") try: log.explain(f"Calling 'pass show {self._passname}'") result = subprocess.check_output(["pass", "show", self._passname], text=True) except subprocess.CalledProcessError as e: - raise AuthError(f"Failed to get password info from {self._passname}: {e}") + raise AuthError(f"Failed to get password info from {self._passname}: {e}") from e prefixed = {} unprefixed = [] diff --git a/PFERD/auth/simple.py b/PFERD/auth/simple.py index 831c12f..dea4b67 100644 --- a/PFERD/auth/simple.py +++ b/PFERD/auth/simple.py @@ -1,4 +1,4 @@ -from typing import Optional, Tuple +from typing import Optional from ..logging import log from ..utils import agetpass, ainput @@ -23,7 +23,7 @@ class SimpleAuthenticator(Authenticator): self._username_fixed = self.username is not None self._password_fixed = self.password is not None - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: if self._username is not None and self._password is not None: return self._username, self._password diff --git a/PFERD/auth/tfa.py b/PFERD/auth/tfa.py index 26b1383..6ae48fe 100644 --- a/PFERD/auth/tfa.py +++ b/PFERD/auth/tfa.py @@ -1,5 +1,3 @@ -from typing import Tuple - from ..logging import log from ..utils import ainput from .authenticator import Authenticator, AuthError @@ -17,7 +15,7 @@ class TfaAuthenticator(Authenticator): code = await ainput("TFA code: ") return code - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: raise AuthError("TFA authenticator does not support usernames") def invalidate_username(self) -> None: diff --git a/PFERD/cli/parser.py b/PFERD/cli/parser.py index 12bfeac..c9bec13 100644 --- a/PFERD/cli/parser.py +++ b/PFERD/cli/parser.py @@ -1,8 +1,9 @@ import argparse import configparser from argparse import ArgumentTypeError +from collections.abc import Callable, Sequence from pathlib import Path -from typing import Any, Callable, List, Optional, Sequence, Union +from typing import Any, Optional from ..output_dir import OnConflict, Redownload from ..version import NAME, VERSION @@ -16,7 +17,7 @@ class ParserLoadError(Exception): class BooleanOptionalAction(argparse.Action): def __init__( self, - option_strings: List[str], + option_strings: list[str], dest: Any, default: Any = None, type: Any = None, @@ -51,7 +52,7 @@ class BooleanOptionalAction(argparse.Action): self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, - values: Union[str, Sequence[Any], None], + values: str | Sequence[Any] | None, option_string: Optional[str] = None, ) -> None: if option_string and option_string in self.option_strings: @@ -72,7 +73,7 @@ def show_value_error(inner: Callable[[str], Any]) -> Callable[[str], Any]: try: return inner(input) except ValueError as e: - raise ArgumentTypeError(e) + raise ArgumentTypeError(e) from e return wrapper diff --git a/PFERD/config.py b/PFERD/config.py index 1a0f017..7da2889 100644 --- a/PFERD/config.py +++ b/PFERD/config.py @@ -3,7 +3,7 @@ import os import sys from configparser import ConfigParser, SectionProxy from pathlib import Path -from typing import Any, List, NoReturn, Optional, Tuple +from typing import Any, NoReturn, Optional from rich.markup import escape @@ -126,13 +126,13 @@ class Config: with open(path, encoding="utf-8") as f: parser.read_file(f, source=str(path)) except FileNotFoundError: - raise ConfigLoadError(path, "File does not exist") + raise ConfigLoadError(path, "File does not exist") from None except IsADirectoryError: - raise ConfigLoadError(path, "That's a directory, not a file") + raise ConfigLoadError(path, "That's a directory, not a file") from None except PermissionError: - raise ConfigLoadError(path, "Insufficient permissions") + raise ConfigLoadError(path, "Insufficient permissions") from None except UnicodeDecodeError: - raise ConfigLoadError(path, "File is not encoded using UTF-8") + raise ConfigLoadError(path, "File is not encoded using UTF-8") from None def dump(self, path: Optional[Path] = None) -> None: """ @@ -150,8 +150,8 @@ class Config: try: path.parent.mkdir(parents=True, exist_ok=True) - except PermissionError: - raise ConfigDumpError(path, "Could not create parent directory") + except PermissionError as e: + raise ConfigDumpError(path, "Could not create parent directory") from e try: # Ensuring we don't accidentally overwrite any existing files by @@ -167,16 +167,16 @@ class Config: with open(path, "w", encoding="utf-8") as f: self._parser.write(f) else: - raise ConfigDumpError(path, "File already exists") + raise ConfigDumpError(path, "File already exists") from None except IsADirectoryError: - raise ConfigDumpError(path, "That's a directory, not a file") - except PermissionError: - raise ConfigDumpError(path, "Insufficient permissions") + raise ConfigDumpError(path, "That's a directory, not a file") from None + except PermissionError as e: + raise ConfigDumpError(path, "Insufficient permissions") from e def dump_to_stdout(self) -> None: self._parser.write(sys.stdout) - def crawl_sections(self) -> List[Tuple[str, SectionProxy]]: + def crawl_sections(self) -> list[tuple[str, SectionProxy]]: result = [] for name, proxy in self._parser.items(): if name.startswith("crawl:"): @@ -184,7 +184,7 @@ class Config: return result - def auth_sections(self) -> List[Tuple[str, SectionProxy]]: + def auth_sections(self) -> list[tuple[str, SectionProxy]]: result = [] for name, proxy in self._parser.items(): if name.startswith("auth:"): diff --git a/PFERD/crawl/__init__.py b/PFERD/crawl/__init__.py index 04a5e3f..6032c97 100644 --- a/PFERD/crawl/__init__.py +++ b/PFERD/crawl/__init__.py @@ -1,5 +1,5 @@ +from collections.abc import Callable from configparser import SectionProxy -from typing import Callable, Dict from ..auth import Authenticator from ..config import Config @@ -13,12 +13,12 @@ CrawlerConstructor = Callable[ str, # Name (without the "crawl:" prefix) SectionProxy, # Crawler's section of global config Config, # Global config - Dict[str, Authenticator], # Loaded authenticators by name + dict[str, Authenticator], # Loaded authenticators by name ], Crawler, ] -CRAWLERS: Dict[str, CrawlerConstructor] = { +CRAWLERS: dict[str, CrawlerConstructor] = { "local": lambda n, s, c, a: LocalCrawler(n, LocalCrawlerSection(s), c), "ilias-web": lambda n, s, c, a: IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a), "kit-ilias-web": lambda n, s, c, a: KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), diff --git a/PFERD/crawl/crawler.py b/PFERD/crawl/crawler.py index f1aec5a..e2cdf30 100644 --- a/PFERD/crawl/crawler.py +++ b/PFERD/crawl/crawler.py @@ -1,10 +1,10 @@ import asyncio import os from abc import ABC, abstractmethod -from collections.abc import Awaitable, Coroutine +from collections.abc import Awaitable, Callable, Coroutine, Sequence from datetime import datetime from pathlib import Path, PurePath -from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, TypeVar +from typing import Any, Optional, TypeVar from ..auth import Authenticator from ..config import Config, Section @@ -116,7 +116,7 @@ class CrawlToken(ReusableAsyncContextManager[ProgressBar]): return bar -class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]): +class DownloadToken(ReusableAsyncContextManager[tuple[ProgressBar, FileSink]]): def __init__(self, limiter: Limiter, fs_token: FileSinkToken, path: PurePath): super().__init__() @@ -128,7 +128,7 @@ class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]): def path(self) -> PurePath: return self._path - async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]: + async def _on_aenter(self) -> tuple[ProgressBar, FileSink]: await self._stack.enter_async_context(self._limiter.limit_download()) sink = await self._stack.enter_async_context(self._fs_token) # The "Downloaded ..." message is printed in the output dir, not here @@ -205,7 +205,7 @@ class CrawlerSection(Section): on_windows = os.name == "nt" return self.s.getboolean("windows_paths", fallback=on_windows) - def auth(self, authenticators: Dict[str, Authenticator]) -> Authenticator: + def auth(self, authenticators: dict[str, Authenticator]) -> Authenticator: value = self.s.get("auth") if value is None: self.missing_value("auth") @@ -262,7 +262,7 @@ class Crawler(ABC): return self._output_dir @staticmethod - async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]: + async def gather(awaitables: Sequence[Awaitable[Any]]) -> list[Any]: """ Similar to asyncio.gather. However, in the case of an exception, all still running tasks are cancelled and the exception is rethrown. @@ -394,7 +394,7 @@ class Crawler(ABC): log.warn("Couldn't find or load old report") return - seen: Set[PurePath] = set() + seen: set[PurePath] = set() for known in sorted(self.prev_report.found_paths): looking_at = list(reversed(known.parents)) + [known] for path in looking_at: diff --git a/PFERD/crawl/http_crawler.py b/PFERD/crawl/http_crawler.py index 572b39d..830f537 100644 --- a/PFERD/crawl/http_crawler.py +++ b/PFERD/crawl/http_crawler.py @@ -3,7 +3,7 @@ import http.cookies import ssl from datetime import datetime from pathlib import Path, PurePath -from typing import Any, Dict, List, Optional, Tuple, cast +from typing import Any, Optional, cast import aiohttp import certifi @@ -43,7 +43,7 @@ class HttpCrawler(Crawler): self._http_timeout = section.http_timeout() self._cookie_jar_path = self._output_dir.resolve(self.COOKIE_FILE) - self._shared_cookie_jar_paths: Optional[List[Path]] = None + self._shared_cookie_jar_paths: Optional[list[Path]] = None self._shared_auth = shared_auth self._output_dir.register_reserved(self.COOKIE_FILE) @@ -98,7 +98,7 @@ class HttpCrawler(Crawler): """ raise RuntimeError("_authenticate() was called but crawler doesn't provide an implementation") - def share_cookies(self, shared: Dict[Authenticator, List[Path]]) -> None: + def share_cookies(self, shared: dict[Authenticator, list[Path]]) -> None: if not self._shared_auth: return @@ -219,7 +219,7 @@ class HttpCrawler(Crawler): etags[str(path)] = etag self._output_dir.report.add_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY, etags) - async def _request_resource_version(self, resource_url: str) -> Tuple[Optional[str], Optional[datetime]]: + async def _request_resource_version(self, resource_url: str) -> tuple[Optional[str], Optional[datetime]]: """ Requests the ETag and Last-Modified headers of a resource via a HEAD request. If no entity tag / modification date can be obtained, the according value will be None. diff --git a/PFERD/crawl/ilias/async_helper.py b/PFERD/crawl/ilias/async_helper.py index 5e586b1..2e6b301 100644 --- a/PFERD/crawl/ilias/async_helper.py +++ b/PFERD/crawl/ilias/async_helper.py @@ -1,5 +1,6 @@ import asyncio -from typing import Any, Callable, Optional +from collections.abc import Callable +from typing import Any, Optional import aiohttp @@ -15,9 +16,9 @@ def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Calla try: return await f(*args, **kwargs) except aiohttp.ContentTypeError: # invalid content type - raise CrawlWarning("ILIAS returned an invalid content type") + raise CrawlWarning("ILIAS returned an invalid content type") from None except aiohttp.TooManyRedirects: - raise CrawlWarning("Got stuck in a redirect loop") + raise CrawlWarning("Got stuck in a redirect loop") from None except aiohttp.ClientPayloadError as e: # encoding or not enough bytes last_exception = e except aiohttp.ClientConnectionError as e: # e.g. timeout, disconnect, resolve failed, etc. diff --git a/PFERD/crawl/ilias/file_templates.py b/PFERD/crawl/ilias/file_templates.py index 814bb7b..37691b2 100644 --- a/PFERD/crawl/ilias/file_templates.py +++ b/PFERD/crawl/ilias/file_templates.py @@ -297,9 +297,7 @@ class Links(Enum): raise ValueError("Missing switch case") def collection_as_one(self) -> bool: - if self == Links.FANCY: - return True - return False + return self == Links.FANCY def extension(self) -> Optional[str]: if self == Links.FANCY: @@ -355,4 +353,4 @@ class Links(Enum): return Links(string) except ValueError: options = [f"'{option.value}'" for option in Links] - raise ValueError(f"must be one of {', '.join(options)}") + raise ValueError(f"must be one of {', '.join(options)}") from None diff --git a/PFERD/crawl/ilias/ilias_web_crawler.py b/PFERD/crawl/ilias/ilias_web_crawler.py index e6929b5..b8212a4 100644 --- a/PFERD/crawl/ilias/ilias_web_crawler.py +++ b/PFERD/crawl/ilias/ilias_web_crawler.py @@ -4,7 +4,7 @@ import os import re from collections.abc import Awaitable, Coroutine from pathlib import PurePath -from typing import Any, Dict, List, Literal, Optional, Set, Union, cast +from typing import Any, Literal, Optional, cast from urllib.parse import urljoin import aiohttp @@ -33,7 +33,7 @@ from .kit_ilias_html import ( ) from .shibboleth_login import ShibbolethLogin -TargetType = Union[str, int] +TargetType = str | int class LoginTypeLocal: @@ -49,7 +49,7 @@ class IliasWebCrawlerSection(HttpCrawlerSection): return base_url - def login(self) -> Union[Literal["shibboleth"], LoginTypeLocal]: + def login(self) -> Literal["shibboleth"] | LoginTypeLocal: login_type = self.s.get("login_type") if not login_type: self.missing_value("login_type") @@ -63,7 +63,7 @@ class IliasWebCrawlerSection(HttpCrawlerSection): self.invalid_value("login_type", login_type, "Should be ") - def tfa_auth(self, authenticators: Dict[str, Authenticator]) -> Optional[Authenticator]: + def tfa_auth(self, authenticators: dict[str, Authenticator]) -> Optional[Authenticator]: value: Optional[str] = self.s.get("tfa_auth") if value is None: return None @@ -110,7 +110,7 @@ class IliasWebCrawlerSection(HttpCrawlerSection): return self.s.getboolean("forums", fallback=False) -_DIRECTORY_PAGES: Set[IliasElementType] = { +_DIRECTORY_PAGES: set[IliasElementType] = { IliasElementType.EXERCISE, IliasElementType.EXERCISE_FILES, IliasElementType.EXERCISE_OVERVIEW, @@ -122,7 +122,7 @@ _DIRECTORY_PAGES: Set[IliasElementType] = { IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, } -_VIDEO_ELEMENTS: Set[IliasElementType] = { +_VIDEO_ELEMENTS: set[IliasElementType] = { IliasElementType.MEDIACAST_VIDEO, IliasElementType.MEDIACAST_VIDEO_FOLDER, IliasElementType.OPENCAST_VIDEO, @@ -172,7 +172,7 @@ class IliasWebCrawler(HttpCrawler): name: str, section: IliasWebCrawlerSection, config: Config, - authenticators: Dict[str, Authenticator], + authenticators: dict[str, Authenticator], ): # Setting a main authenticator for cookie sharing auth = section.auth(authenticators) @@ -201,7 +201,7 @@ instance's greatest bottleneck. self._links = section.links() self._videos = section.videos() self._forums = section.forums() - self._visited_urls: Dict[str, PurePath] = dict() + self._visited_urls: dict[str, PurePath] = dict() async def _run(self) -> None: if isinstance(self._target, int): @@ -264,9 +264,9 @@ instance's greatest bottleneck. expected_course_id: Optional[int] = None, crawl_nested_courses: bool = False, ) -> None: - elements: List[IliasPageElement] = [] + elements: list[IliasPageElement] = [] # A list as variable redefinitions are not propagated to outer scopes - description: List[BeautifulSoup] = [] + description: list[BeautifulSoup] = [] @_iorepeat(3, "crawling folder") async def gather_elements() -> None: @@ -309,7 +309,7 @@ instance's greatest bottleneck. elements.sort(key=lambda e: e.id()) - tasks: List[Awaitable[None]] = [] + tasks: list[Awaitable[None]] = [] for element in elements: if handle := await self._handle_ilias_element(cl.path, element, crawl_nested_courses): tasks.append(asyncio.create_task(handle)) @@ -340,15 +340,14 @@ instance's greatest bottleneck. ) return None - if element.type in _VIDEO_ELEMENTS: - if not self._videos: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](enable with option 'videos')", - ) - return None + if element.type in _VIDEO_ELEMENTS and not self._videos: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](enable with option 'videos')", + ) + return None if element.type == IliasElementType.FILE: return await self._handle_file(element, element_path) @@ -522,8 +521,8 @@ instance's greatest bottleneck. sink.file.write(rendered.encode("utf-8")) sink.done() - async def _resolve_link_target(self, export_url: str) -> Union[BeautifulSoup, Literal["none"]]: - async def impl() -> Optional[Union[BeautifulSoup, Literal["none"]]]: + async def _resolve_link_target(self, export_url: str) -> BeautifulSoup | Literal["none"]: + async def impl() -> Optional[BeautifulSoup | Literal["none"]]: async with self.session.get(export_url, allow_redirects=False) as resp: # No redirect means we were authenticated if hdrs.LOCATION not in resp.headers: @@ -658,7 +657,7 @@ instance's greatest bottleneck. def _previous_contained_opencast_videos( self, element: IliasPageElement, element_path: PurePath - ) -> List[PurePath]: + ) -> list[PurePath]: if not self.prev_report: return [] custom_value = self.prev_report.get_custom_value(_get_video_cache_key(element)) @@ -714,7 +713,7 @@ instance's greatest bottleneck. add_to_report([str(self._transformer.transform(dl.path))]) return - contained_video_paths: List[str] = [] + contained_video_paths: list[str] = [] for stream_element in stream_elements: video_path = dl.path.parent / stream_element.name @@ -832,7 +831,7 @@ instance's greatest bottleneck. elements = parse_ilias_forum_export(soupify(export)) - tasks: List[Awaitable[None]] = [] + tasks: list[Awaitable[None]] = [] for thread in elements: tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, element.url))) @@ -842,7 +841,7 @@ instance's greatest bottleneck. @anoncritical @_iorepeat(3, "saving forum thread") async def _download_forum_thread( - self, parent_path: PurePath, thread: Union[IliasForumThread, IliasPageElement], forum_url: str + self, parent_path: PurePath, thread: IliasForumThread | IliasPageElement, forum_url: str ) -> None: path = parent_path / (_sanitize_path_name(thread.name) + ".html") maybe_dl = await self.download(path, mtime=thread.mtime) @@ -871,7 +870,7 @@ instance's greatest bottleneck. @_iorepeat(3, "crawling learning module") @anoncritical async def _crawl_learning_module(self, element: IliasPageElement, cl: CrawlToken) -> None: - elements: List[IliasLearningModulePage] = [] + elements: list[IliasLearningModulePage] = [] async with cl: log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}") @@ -891,7 +890,7 @@ instance's greatest bottleneck. for index, lm_element in enumerate(elements): lm_element.title = f"{index:02}_{lm_element.title}" - tasks: List[Awaitable[None]] = [] + tasks: list[Awaitable[None]] = [] for index, elem in enumerate(elements): prev_url = elements[index - 1].title if index > 0 else None next_url = elements[index + 1].title if index < len(elements) - 1 else None @@ -906,10 +905,10 @@ instance's greatest bottleneck. self, path: PurePath, start_url: Optional[str], - dir: Union[Literal["left"], Literal["right"]], + dir: Literal["left"] | Literal["right"], parent_element: IliasPageElement, - ) -> List[IliasLearningModulePage]: - elements: List[IliasLearningModulePage] = [] + ) -> list[IliasLearningModulePage]: + elements: list[IliasLearningModulePage] = [] if not start_url: return elements @@ -923,10 +922,7 @@ instance's greatest bottleneck. page = IliasPage(soup, parent_element) if next := page.get_learning_module_data(): elements.append(next) - if dir == "left": - next_element_url = next.previous_url - else: - next_element_url = next.next_url + next_element_url = next.previous_url if dir == "left" else next.next_url counter += 1 return elements @@ -950,16 +946,10 @@ instance's greatest bottleneck. if prev: prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html")) - if prev_p: - prev = cast(str, os.path.relpath(prev_p, my_path.parent)) - else: - prev = None + prev = cast(str, os.path.relpath(prev_p, my_path.parent)) if prev_p else None if next: next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html")) - if next_p: - next = cast(str, os.path.relpath(next_p, my_path.parent)) - else: - next = None + next = cast(str, os.path.relpath(next_p, my_path.parent)) if next_p else None async with maybe_dl as (bar, sink): content = element.content @@ -973,14 +963,13 @@ instance's greatest bottleneck. """ log.explain_topic("Internalizing images") for elem in tag.find_all(recursive=True): - if elem.name == "img": - if src := elem.attrs.get("src", None): - url = urljoin(self._base_url, cast(str, src)) - if not url.startswith(self._base_url): - continue - log.explain(f"Internalizing {url!r}") - img = await self._get_authenticated(url) - elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode() + if elem.name == "img" and (src := elem.attrs.get("src", None)): + url = urljoin(self._base_url, cast(str, src)) + if not url.startswith(self._base_url): + continue + log.explain(f"Internalizing {url!r}") + img = await self._get_authenticated(url) + elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode() if elem.name == "iframe" and cast(str, elem.attrs.get("src", "")).startswith("//"): # For unknown reasons the protocol seems to be stripped. elem.attrs["src"] = "https:" + cast(str, elem.attrs["src"]) @@ -1025,7 +1014,7 @@ instance's greatest bottleneck. ) return soup - async def _post(self, url: str, data: dict[str, Union[str, List[str]]]) -> bytes: + async def _post(self, url: str, data: dict[str, str | list[str]]) -> bytes: form_data = aiohttp.FormData() for key, val in data.items(): form_data.add_field(key, val) diff --git a/PFERD/crawl/ilias/kit_ilias_html.py b/PFERD/crawl/ilias/kit_ilias_html.py index 4abb350..d7f6f8d 100644 --- a/PFERD/crawl/ilias/kit_ilias_html.py +++ b/PFERD/crawl/ilias/kit_ilias_html.py @@ -1,9 +1,10 @@ import json import re +from collections.abc import Callable from dataclasses import dataclass from datetime import date, datetime, timedelta from enum import Enum -from typing import Callable, Dict, Optional, Union, cast +from typing import Optional, cast from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup, Tag @@ -13,7 +14,7 @@ from PFERD.crawl.crawler import CrawlWarning from PFERD.logging import log from PFERD.utils import url_set_query_params -TargetType = Union[str, int] +TargetType = str | int class TypeMatcher: @@ -308,7 +309,7 @@ class IliasPageElement: """ # This checks whether we can reach a `:` without passing a `-` - if re.search(r"^[^-]+: ", meeting_name): + if re.search(r"^[^-]+: ", meeting_name): # noqa: SIM108 # Meeting name only contains date: "05. Jan 2000:" split_delimiter = ":" else: @@ -331,7 +332,7 @@ class IliasPageElement: @dataclass class IliasDownloadForumData: url: str - form_data: Dict[str, Union[str, list[str]]] + form_data: dict[str, str | list[str]] empty: bool @@ -433,21 +434,20 @@ class IliasPage: for p in paragraphs: if p.find_parent(class_=is_interesting_class): continue - if "ilc_media_cont_MediaContainer" in p["class"]: + if "ilc_media_cont_MediaContainer" in p["class"] and (video := p.select_one("video")): # We have an embedded video which should be downloaded by _find_mob_videos - if video := p.select_one("video"): - url, title = self._find_mob_video_url_title(video, p) - raw_html += '
External Video: {title}' - else: - raw_html += f"Video elided. Filename: '{title}'." - raw_html += "
\n" - continue + url, title = self._find_mob_video_url_title(video, p) + raw_html += '
External Video: {title}' + else: + raw_html += f"Video elided. Filename: '{title}'." + raw_html += "
\n" + continue # Ignore special listings (like folder groupings) if "ilc_section_Special" in p["class"]: @@ -794,7 +794,7 @@ class IliasPage: is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None - if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER: + if is_paginated and self._page_type != IliasElementType.OPENCAST_VIDEO_FOLDER: # We are in stage 2 - try to break pagination return self._find_opencast_video_entries_paginated() @@ -1164,6 +1164,9 @@ class IliasPage: """ found_titles = [] + if None == "hey": + pass + outer_accordion_content: Optional[Tag] = None parents: list[Tag] = list(tag.parents) @@ -1302,10 +1305,7 @@ class IliasPage: ), ) caption_container = caption_parent.find_next_sibling("div") - if caption_container: - description = caption_container.get_text().strip() - else: - description = None + description = caption_container.get_text().strip() if caption_container else None if not typ: _unexpected_html_warning() @@ -1444,9 +1444,7 @@ class IliasPage: return True # The individual video player wrapper page has nothing of the above. # Match it by its playerContainer. - if soup.select_one("#playerContainer") is not None: - return True - return False + return soup.select_one("#playerContainer") is not None @staticmethod def _find_date_in_text(text: str) -> Optional[datetime]: @@ -1505,11 +1503,11 @@ def demangle_date(date_str: str, fail_silently: bool = False) -> Optional[dateti # Normalize whitespace because users date_str = re.sub(r"\s+", " ", date_str) - date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I) - date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I) - date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I) + date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, flags=re.I) + date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, flags=re.I) + date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, flags=re.I) date_str = date_str.strip() - for german, english in zip(german_months, english_months): + for german, english in zip(german_months, english_months, strict=True): date_str = date_str.replace(german, english) # Remove trailing dots for abbreviations, e.g. "20. Apr. 2020" -> "20. Apr 2020" date_str = date_str.replace(english + ".", english) diff --git a/PFERD/crawl/ilias/kit_ilias_web_crawler.py b/PFERD/crawl/ilias/kit_ilias_web_crawler.py index fc1d58f..5088e01 100644 --- a/PFERD/crawl/ilias/kit_ilias_web_crawler.py +++ b/PFERD/crawl/ilias/kit_ilias_web_crawler.py @@ -1,4 +1,4 @@ -from typing import Dict, Literal +from typing import Literal from ...auth import Authenticator from ...config import Config @@ -26,7 +26,7 @@ class KitIliasWebCrawler(IliasWebCrawler): name: str, section: KitIliasWebCrawlerSection, config: Config, - authenticators: Dict[str, Authenticator], + authenticators: dict[str, Authenticator], ): super().__init__(name, section, config, authenticators) diff --git a/PFERD/crawl/kit_ipd_crawler.py b/PFERD/crawl/kit_ipd_crawler.py index f47c969..165a661 100644 --- a/PFERD/crawl/kit_ipd_crawler.py +++ b/PFERD/crawl/kit_ipd_crawler.py @@ -1,9 +1,11 @@ import os import re +from collections.abc import Awaitable, Generator, Iterable from dataclasses import dataclass from datetime import datetime from pathlib import PurePath -from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union, cast +from re import Pattern +from typing import Any, Optional, Union, cast from urllib.parse import urljoin from bs4 import BeautifulSoup, Tag @@ -44,7 +46,7 @@ class KitIpdFile: @dataclass class KitIpdFolder: name: str - entries: List[Union[KitIpdFile, "KitIpdFolder"]] + entries: list[Union[KitIpdFile, "KitIpdFolder"]] def explain(self) -> None: log.explain_topic(f"Folder {self.name!r}") @@ -68,7 +70,7 @@ class KitIpdCrawler(HttpCrawler): if not maybe_cl: return - tasks: List[Awaitable[None]] = [] + tasks: list[Awaitable[None]] = [] async with maybe_cl: for item in await self._fetch_items(): @@ -120,9 +122,9 @@ class KitIpdCrawler(HttpCrawler): async with maybe_dl as (bar, sink): await self._stream_from_url(file.url, element_path, sink, bar) - async def _fetch_items(self) -> Iterable[Union[KitIpdFile, KitIpdFolder]]: + async def _fetch_items(self) -> Iterable[KitIpdFile | KitIpdFolder]: page, url = await self.get_page() - elements: List[Tag] = self._find_file_links(page) + elements: list[Tag] = self._find_file_links(page) # do not add unnecessary nesting for a single

heading drop_h1: bool = len(page.find_all(name="h1")) <= 1 @@ -151,7 +153,7 @@ class KitIpdCrawler(HttpCrawler): name = os.path.basename(url) return KitIpdFile(name, url) - def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> list[Tag]: + def _find_file_links(self, tag: Tag | BeautifulSoup) -> list[Tag]: return cast(list[Tag], tag.find_all(name="a", attrs={"href": self._file_regex})) def _abs_url_from_link(self, url: str, link_tag: Tag) -> str: @@ -172,7 +174,7 @@ class KitIpdCrawler(HttpCrawler): self._add_etag_to_report(path, resp.headers.get("ETag")) - async def get_page(self) -> Tuple[BeautifulSoup, str]: + async def get_page(self) -> tuple[BeautifulSoup, str]: async with self.session.get(self._url) as request: # The web page for Algorithmen für Routenplanung contains some # weird comments that beautifulsoup doesn't parse correctly. This diff --git a/PFERD/deduplicator.py b/PFERD/deduplicator.py index c204726..18940c5 100644 --- a/PFERD/deduplicator.py +++ b/PFERD/deduplicator.py @@ -1,5 +1,5 @@ +from collections.abc import Iterator from pathlib import PurePath -from typing import Iterator, Set from .logging import log from .utils import fmt_path @@ -43,7 +43,7 @@ class Deduplicator: def __init__(self, windows_paths: bool) -> None: self._windows_paths = windows_paths - self._known: Set[PurePath] = set() + self._known: set[PurePath] = set() def _add(self, path: PurePath) -> None: self._known.add(path) diff --git a/PFERD/limiter.py b/PFERD/limiter.py index 49de0ed..01b4914 100644 --- a/PFERD/limiter.py +++ b/PFERD/limiter.py @@ -1,8 +1,9 @@ import asyncio import time +from collections.abc import AsyncIterator from contextlib import asynccontextmanager from dataclasses import dataclass -from typing import AsyncIterator, Optional +from typing import Optional @dataclass diff --git a/PFERD/logging.py b/PFERD/logging.py index e371494..a810aa9 100644 --- a/PFERD/logging.py +++ b/PFERD/logging.py @@ -1,8 +1,9 @@ import asyncio import sys import traceback +from collections.abc import AsyncIterator, Iterator from contextlib import AbstractContextManager, asynccontextmanager, contextmanager -from typing import AsyncIterator, Iterator, List, Optional +from typing import Optional from rich.console import Console, Group from rich.live import Live @@ -60,7 +61,7 @@ class Log: self._showing_progress = False self._progress_suspended = False self._lock = asyncio.Lock() - self._lines: List[str] = [] + self._lines: list[str] = [] # Whether different parts of the output are enabled or disabled self.output_explain = False diff --git a/PFERD/output_dir.py b/PFERD/output_dir.py index c452c0f..159e1db 100644 --- a/PFERD/output_dir.py +++ b/PFERD/output_dir.py @@ -4,12 +4,13 @@ import os import random import shutil import string -from contextlib import contextmanager +from collections.abc import Iterator +from contextlib import contextmanager, suppress from dataclasses import dataclass from datetime import datetime from enum import Enum from pathlib import Path, PurePath -from typing import BinaryIO, Iterator, Optional, Tuple +from typing import BinaryIO, Optional from .logging import log from .report import Report, ReportLoadError @@ -35,7 +36,7 @@ class Redownload(Enum): try: return Redownload(string) except ValueError: - raise ValueError("must be one of 'never', 'never-smart', 'always', 'always-smart'") + raise ValueError("must be one of 'never', 'never-smart', 'always', 'always-smart'") from None class OnConflict(Enum): @@ -53,7 +54,7 @@ class OnConflict(Enum): raise ValueError( "must be one of 'prompt', 'local-first'," " 'remote-first', 'no-delete', 'no-delete-prompt-overwrite'" - ) + ) from None @dataclass @@ -177,8 +178,8 @@ class OutputDirectory: try: self._root.mkdir(parents=True, exist_ok=True) - except OSError: - raise OutputDirError("Failed to create base directory") + except OSError as e: + raise OutputDirError("Failed to create base directory") from e def register_reserved(self, path: PurePath) -> None: self._report.mark_reserved(path) @@ -358,7 +359,7 @@ class OutputDirectory: async def _create_tmp_file( self, local_path: Path, - ) -> Tuple[Path, BinaryIO]: + ) -> tuple[Path, BinaryIO]: """ May raise an OutputDirError. """ @@ -509,10 +510,8 @@ class OutputDirectory: await self._cleanup(child, pure_child) if delete_self: - try: + with suppress(OSError): path.rmdir() - except OSError: - pass async def _cleanup_file(self, path: Path, pure: PurePath) -> None: if self._report.is_marked(pure): diff --git a/PFERD/pferd.py b/PFERD/pferd.py index c805c13..1fe37d0 100644 --- a/PFERD/pferd.py +++ b/PFERD/pferd.py @@ -1,5 +1,5 @@ from pathlib import Path, PurePath -from typing import Dict, List, Optional +from typing import Optional from rich.markup import escape @@ -15,7 +15,7 @@ class PferdLoadError(Exception): class Pferd: - def __init__(self, config: Config, cli_crawlers: Optional[List[str]], cli_skips: Optional[List[str]]): + def __init__(self, config: Config, cli_crawlers: Optional[list[str]], cli_skips: Optional[list[str]]): """ May throw PferdLoadError. """ @@ -23,10 +23,10 @@ class Pferd: self._config = config self._crawlers_to_run = self._find_crawlers_to_run(config, cli_crawlers, cli_skips) - self._authenticators: Dict[str, Authenticator] = {} - self._crawlers: Dict[str, Crawler] = {} + self._authenticators: dict[str, Authenticator] = {} + self._crawlers: dict[str, Crawler] = {} - def _find_config_crawlers(self, config: Config) -> List[str]: + def _find_config_crawlers(self, config: Config) -> list[str]: crawl_sections = [] for name, section in config.crawl_sections(): @@ -37,7 +37,7 @@ class Pferd: return crawl_sections - def _find_cli_crawlers(self, config: Config, cli_crawlers: List[str]) -> List[str]: + def _find_cli_crawlers(self, config: Config, cli_crawlers: list[str]) -> list[str]: if len(cli_crawlers) != len(set(cli_crawlers)): raise PferdLoadError("Some crawlers were selected multiple times") @@ -68,12 +68,12 @@ class Pferd: def _find_crawlers_to_run( self, config: Config, - cli_crawlers: Optional[List[str]], - cli_skips: Optional[List[str]], - ) -> List[str]: + cli_crawlers: Optional[list[str]], + cli_skips: Optional[list[str]], + ) -> list[str]: log.explain_topic("Deciding which crawlers to run") - crawlers: List[str] + crawlers: list[str] if cli_crawlers is None: log.explain("No crawlers specified on CLI") log.explain("Running crawlers specified in config") @@ -104,7 +104,7 @@ class Pferd: def _load_crawlers(self) -> None: # Cookie sharing - kit_ilias_web_paths: Dict[Authenticator, List[Path]] = {} + kit_ilias_web_paths: dict[Authenticator, list[Path]] = {} for name, section in self._config.crawl_sections(): log.print(f"[bold bright_cyan]Loading[/] {escape(name)}") @@ -117,9 +117,8 @@ class Pferd: crawler = crawler_constructor(name, section, self._config, self._authenticators) self._crawlers[name] = crawler - if self._config.default_section.share_cookies(): - if isinstance(crawler, KitIliasWebCrawler): - crawler.share_cookies(kit_ilias_web_paths) + if self._config.default_section.share_cookies() and isinstance(crawler, KitIliasWebCrawler): + crawler.share_cookies(kit_ilias_web_paths) def debug_transforms(self) -> None: for name in self._crawlers_to_run: diff --git a/PFERD/report.py b/PFERD/report.py index 72e2727..5b37c1c 100644 --- a/PFERD/report.py +++ b/PFERD/report.py @@ -1,6 +1,6 @@ import json from pathlib import Path, PurePath -from typing import Any, Dict, List, Optional, Set +from typing import Any, Optional class ReportLoadError(Exception): @@ -42,32 +42,32 @@ class Report: def __init__(self) -> None: # Paths found by the crawler, untransformed - self.found_paths: Set[PurePath] = set() + self.found_paths: set[PurePath] = set() # Files reserved for metadata files (e. g. the report file or cookies) # that can't be overwritten by user transforms and won't be cleaned up # at the end. - self.reserved_files: Set[PurePath] = set() + self.reserved_files: set[PurePath] = set() # Files found by the crawler, transformed. Only includes files that # were downloaded (or a download was attempted) - self.known_files: Set[PurePath] = set() + self.known_files: set[PurePath] = set() - self.added_files: Set[PurePath] = set() - self.changed_files: Set[PurePath] = set() - self.deleted_files: Set[PurePath] = set() + self.added_files: set[PurePath] = set() + self.changed_files: set[PurePath] = set() + self.deleted_files: set[PurePath] = set() # Files that should have been deleted by the cleanup but weren't - self.not_deleted_files: Set[PurePath] = set() + self.not_deleted_files: set[PurePath] = set() # Custom crawler-specific data - self.custom: Dict[str, Any] = dict() + self.custom: dict[str, Any] = dict() # Encountered errors and warnings - self.encountered_warnings: List[str] = [] - self.encountered_errors: List[str] = [] + self.encountered_warnings: list[str] = [] + self.encountered_errors: list[str] = [] @staticmethod - def _get_list_of_strs(data: Dict[str, Any], key: str) -> List[str]: + def _get_list_of_strs(data: dict[str, Any], key: str) -> list[str]: result: Any = data.get(key, []) if not isinstance(result, list): @@ -80,8 +80,8 @@ class Report: return result @staticmethod - def _get_str_dictionary(data: Dict[str, Any], key: str) -> Dict[str, Any]: - result: Dict[str, Any] = data.get(key, {}) + def _get_str_dictionary(data: dict[str, Any], key: str) -> dict[str, Any]: + result: dict[str, Any] = data.get(key, {}) if not isinstance(result, dict): raise ReportLoadError(f"Incorrect format: {key!r} is not a dictionary") @@ -170,7 +170,7 @@ class Report: self.known_files.add(path) @property - def marked(self) -> Set[PurePath]: + def marked(self) -> set[PurePath]: return self.known_files | self.reserved_files def is_marked(self, path: PurePath) -> bool: diff --git a/PFERD/transformer.py b/PFERD/transformer.py index 96b5ca7..2cfb28d 100644 --- a/PFERD/transformer.py +++ b/PFERD/transformer.py @@ -1,10 +1,12 @@ import ast +import contextlib import re from abc import ABC, abstractmethod +from collections.abc import Callable, Sequence from dataclasses import dataclass from enum import Enum from pathlib import PurePath -from typing import Callable, Dict, List, Optional, Sequence, TypeVar, Union +from typing import Optional, TypeVar from .logging import log from .utils import fmt_path, str_path @@ -23,7 +25,7 @@ class Empty: pass -RightSide = Union[str, Ignore, Empty] +RightSide = str | Ignore | Empty @dataclass @@ -35,7 +37,7 @@ class Ignored: pass -TransformResult = Optional[Union[Transformed, Ignored]] +TransformResult = Transformed | Ignored | None @dataclass @@ -47,7 +49,7 @@ class Rule: right: RightSide right_index: int - def right_result(self, path: PurePath) -> Union[str, Transformed, Ignored]: + def right_result(self, path: PurePath) -> str | Transformed | Ignored: if isinstance(self.right, str): return self.right elif isinstance(self.right, Ignore): @@ -93,24 +95,20 @@ class ExactReTf(Transformation): # since elements of "match.groups()" can be None, mypy is wrong. groups: Sequence[Optional[str]] = [match[0]] + list(match.groups()) - locals_dir: Dict[str, Union[str, int, float]] = {} + locals_dir: dict[str, str | int | float] = {} for i, group in enumerate(groups): if group is None: continue locals_dir[f"g{i}"] = group - try: + with contextlib.suppress(ValueError): locals_dir[f"i{i}"] = int(group) - except ValueError: - pass - try: + with contextlib.suppress(ValueError): locals_dir[f"f{i}"] = float(group) - except ValueError: - pass - named_groups: Dict[str, str] = match.groupdict() + named_groups: dict[str, str] = match.groupdict() for name, capture in named_groups.items(): locals_dir[name] = capture @@ -228,7 +226,7 @@ class Line: self.expect(string) return value - def one_of(self, parsers: List[Callable[[], T]], description: str) -> T: + def one_of(self, parsers: list[Callable[[], T]], description: str) -> T: for parser in parsers: index = self.index try: @@ -315,7 +313,7 @@ def parse_left(line: Line) -> str: return parse_str(line) -def parse_right(line: Line) -> Union[str, Ignore]: +def parse_right(line: Line) -> str | Ignore: c = line.peek() if c in QUOTATION_MARKS: return parse_quoted_str(line) diff --git a/PFERD/utils.py b/PFERD/utils.py index acd282e..2d01713 100644 --- a/PFERD/utils.py +++ b/PFERD/utils.py @@ -3,10 +3,11 @@ import getpass import sys import threading from abc import ABC, abstractmethod +from collections.abc import Callable from contextlib import AsyncExitStack from pathlib import Path, PurePath from types import TracebackType -from typing import Any, Callable, Dict, Generic, Optional, Type, TypeVar +from typing import Any, Generic, Optional, TypeVar from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit import bs4 @@ -79,7 +80,7 @@ def url_set_query_param(url: str, param: str, value: str) -> str: return urlunsplit((scheme, netloc, path, new_query_string, fragment)) -def url_set_query_params(url: str, params: Dict[str, str]) -> str: +def url_set_query_params(url: str, params: dict[str, str]) -> str: """ Sets multiple query parameters in an url, overwriting existing ones. """ @@ -132,7 +133,7 @@ class ReusableAsyncContextManager(ABC, Generic[T]): async def __aexit__( self, - exc_type: Optional[Type[BaseException]], + exc_type: Optional[type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> Optional[bool]: