Fix ruff errors

This commit is contained in:
I-Al-Istannen
2025-10-19 15:25:40 +02:00
parent 2cf0e060ed
commit 6e563134b2
26 changed files with 194 additions and 209 deletions

View File

@@ -1,5 +1,5 @@
from collections.abc import Callable
from configparser import SectionProxy
from typing import Callable, Dict
from ..config import Config
from .authenticator import Authenticator, AuthError, AuthLoadError, AuthSection # noqa: F401
@@ -18,7 +18,7 @@ AuthConstructor = Callable[
Authenticator,
]
AUTHENTICATORS: Dict[str, AuthConstructor] = {
AUTHENTICATORS: dict[str, AuthConstructor] = {
"credential-file": lambda n, s, c: CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c),
"keyring": lambda n, s, c: KeyringAuthenticator(n, KeyringAuthSection(s)),
"pass": lambda n, s, c: PassAuthenticator(n, PassAuthSection(s)),

View File

@@ -1,5 +1,4 @@
from abc import ABC, abstractmethod
from typing import Tuple
from ..config import Section
@@ -35,7 +34,7 @@ class Authenticator(ABC):
self.name = name
@abstractmethod
async def credentials(self) -> Tuple[str, str]:
async def credentials(self) -> tuple[str, str]:
pass
async def username(self) -> str:

View File

@@ -1,5 +1,4 @@
from pathlib import Path
from typing import Tuple
from ..config import Config
from ..utils import fmt_real_path
@@ -23,7 +22,9 @@ class CredentialFileAuthenticator(Authenticator):
with open(path, encoding="utf-8") as f:
lines = list(f)
except UnicodeDecodeError:
raise AuthLoadError(f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8")
raise AuthLoadError(
f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8"
) from None
except OSError as e:
raise AuthLoadError(f"No credential file at {fmt_real_path(path)}") from e
@@ -42,5 +43,5 @@ class CredentialFileAuthenticator(Authenticator):
self._username = uline[9:]
self._password = pline[9:]
async def credentials(self) -> Tuple[str, str]:
async def credentials(self) -> tuple[str, str]:
return self._username, self._password

View File

@@ -1,4 +1,4 @@
from typing import Optional, Tuple, cast
from typing import Optional, cast
import keyring
@@ -27,7 +27,7 @@ class KeyringAuthenticator(Authenticator):
self._password_invalidated = False
self._username_fixed = section.username() is not None
async def credentials(self) -> Tuple[str, str]:
async def credentials(self) -> tuple[str, str]:
# Request the username
if self._username is None:
async with log.exclusive_output():

View File

@@ -1,6 +1,5 @@
import re
import subprocess
from typing import List, Tuple
from ..logging import log
from .authenticator import Authenticator, AuthError, AuthSection
@@ -12,11 +11,11 @@ class PassAuthSection(AuthSection):
self.missing_value("passname")
return value
def username_prefixes(self) -> List[str]:
def username_prefixes(self) -> list[str]:
value = self.s.get("username_prefixes", "login,username,user")
return [prefix.lower() for prefix in value.split(",")]
def password_prefixes(self) -> List[str]:
def password_prefixes(self) -> list[str]:
value = self.s.get("password_prefixes", "password,pass,secret")
return [prefix.lower() for prefix in value.split(",")]
@@ -31,14 +30,14 @@ class PassAuthenticator(Authenticator):
self._username_prefixes = section.username_prefixes()
self._password_prefixes = section.password_prefixes()
async def credentials(self) -> Tuple[str, str]:
async def credentials(self) -> tuple[str, str]:
log.explain_topic("Obtaining credentials from pass")
try:
log.explain(f"Calling 'pass show {self._passname}'")
result = subprocess.check_output(["pass", "show", self._passname], text=True)
except subprocess.CalledProcessError as e:
raise AuthError(f"Failed to get password info from {self._passname}: {e}")
raise AuthError(f"Failed to get password info from {self._passname}: {e}") from e
prefixed = {}
unprefixed = []

View File

@@ -1,4 +1,4 @@
from typing import Optional, Tuple
from typing import Optional
from ..logging import log
from ..utils import agetpass, ainput
@@ -23,7 +23,7 @@ class SimpleAuthenticator(Authenticator):
self._username_fixed = self.username is not None
self._password_fixed = self.password is not None
async def credentials(self) -> Tuple[str, str]:
async def credentials(self) -> tuple[str, str]:
if self._username is not None and self._password is not None:
return self._username, self._password

View File

@@ -1,5 +1,3 @@
from typing import Tuple
from ..logging import log
from ..utils import ainput
from .authenticator import Authenticator, AuthError
@@ -17,7 +15,7 @@ class TfaAuthenticator(Authenticator):
code = await ainput("TFA code: ")
return code
async def credentials(self) -> Tuple[str, str]:
async def credentials(self) -> tuple[str, str]:
raise AuthError("TFA authenticator does not support usernames")
def invalidate_username(self) -> None:

View File

@@ -1,8 +1,9 @@
import argparse
import configparser
from argparse import ArgumentTypeError
from collections.abc import Callable, Sequence
from pathlib import Path
from typing import Any, Callable, List, Optional, Sequence, Union
from typing import Any, Optional
from ..output_dir import OnConflict, Redownload
from ..version import NAME, VERSION
@@ -16,7 +17,7 @@ class ParserLoadError(Exception):
class BooleanOptionalAction(argparse.Action):
def __init__(
self,
option_strings: List[str],
option_strings: list[str],
dest: Any,
default: Any = None,
type: Any = None,
@@ -51,7 +52,7 @@ class BooleanOptionalAction(argparse.Action):
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Union[str, Sequence[Any], None],
values: str | Sequence[Any] | None,
option_string: Optional[str] = None,
) -> None:
if option_string and option_string in self.option_strings:
@@ -72,7 +73,7 @@ def show_value_error(inner: Callable[[str], Any]) -> Callable[[str], Any]:
try:
return inner(input)
except ValueError as e:
raise ArgumentTypeError(e)
raise ArgumentTypeError(e) from e
return wrapper

View File

@@ -3,7 +3,7 @@ import os
import sys
from configparser import ConfigParser, SectionProxy
from pathlib import Path
from typing import Any, List, NoReturn, Optional, Tuple
from typing import Any, NoReturn, Optional
from rich.markup import escape
@@ -126,13 +126,13 @@ class Config:
with open(path, encoding="utf-8") as f:
parser.read_file(f, source=str(path))
except FileNotFoundError:
raise ConfigLoadError(path, "File does not exist")
raise ConfigLoadError(path, "File does not exist") from None
except IsADirectoryError:
raise ConfigLoadError(path, "That's a directory, not a file")
raise ConfigLoadError(path, "That's a directory, not a file") from None
except PermissionError:
raise ConfigLoadError(path, "Insufficient permissions")
raise ConfigLoadError(path, "Insufficient permissions") from None
except UnicodeDecodeError:
raise ConfigLoadError(path, "File is not encoded using UTF-8")
raise ConfigLoadError(path, "File is not encoded using UTF-8") from None
def dump(self, path: Optional[Path] = None) -> None:
"""
@@ -150,8 +150,8 @@ class Config:
try:
path.parent.mkdir(parents=True, exist_ok=True)
except PermissionError:
raise ConfigDumpError(path, "Could not create parent directory")
except PermissionError as e:
raise ConfigDumpError(path, "Could not create parent directory") from e
try:
# Ensuring we don't accidentally overwrite any existing files by
@@ -167,16 +167,16 @@ class Config:
with open(path, "w", encoding="utf-8") as f:
self._parser.write(f)
else:
raise ConfigDumpError(path, "File already exists")
raise ConfigDumpError(path, "File already exists") from None
except IsADirectoryError:
raise ConfigDumpError(path, "That's a directory, not a file")
except PermissionError:
raise ConfigDumpError(path, "Insufficient permissions")
raise ConfigDumpError(path, "That's a directory, not a file") from None
except PermissionError as e:
raise ConfigDumpError(path, "Insufficient permissions") from e
def dump_to_stdout(self) -> None:
self._parser.write(sys.stdout)
def crawl_sections(self) -> List[Tuple[str, SectionProxy]]:
def crawl_sections(self) -> list[tuple[str, SectionProxy]]:
result = []
for name, proxy in self._parser.items():
if name.startswith("crawl:"):
@@ -184,7 +184,7 @@ class Config:
return result
def auth_sections(self) -> List[Tuple[str, SectionProxy]]:
def auth_sections(self) -> list[tuple[str, SectionProxy]]:
result = []
for name, proxy in self._parser.items():
if name.startswith("auth:"):

View File

@@ -1,5 +1,5 @@
from collections.abc import Callable
from configparser import SectionProxy
from typing import Callable, Dict
from ..auth import Authenticator
from ..config import Config
@@ -13,12 +13,12 @@ CrawlerConstructor = Callable[
str, # Name (without the "crawl:" prefix)
SectionProxy, # Crawler's section of global config
Config, # Global config
Dict[str, Authenticator], # Loaded authenticators by name
dict[str, Authenticator], # Loaded authenticators by name
],
Crawler,
]
CRAWLERS: Dict[str, CrawlerConstructor] = {
CRAWLERS: dict[str, CrawlerConstructor] = {
"local": lambda n, s, c, a: LocalCrawler(n, LocalCrawlerSection(s), c),
"ilias-web": lambda n, s, c, a: IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a),
"kit-ilias-web": lambda n, s, c, a: KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a),

View File

@@ -1,10 +1,10 @@
import asyncio
import os
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Coroutine
from collections.abc import Awaitable, Callable, Coroutine, Sequence
from datetime import datetime
from pathlib import Path, PurePath
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, TypeVar
from typing import Any, Optional, TypeVar
from ..auth import Authenticator
from ..config import Config, Section
@@ -116,7 +116,7 @@ class CrawlToken(ReusableAsyncContextManager[ProgressBar]):
return bar
class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
class DownloadToken(ReusableAsyncContextManager[tuple[ProgressBar, FileSink]]):
def __init__(self, limiter: Limiter, fs_token: FileSinkToken, path: PurePath):
super().__init__()
@@ -128,7 +128,7 @@ class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
def path(self) -> PurePath:
return self._path
async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]:
async def _on_aenter(self) -> tuple[ProgressBar, FileSink]:
await self._stack.enter_async_context(self._limiter.limit_download())
sink = await self._stack.enter_async_context(self._fs_token)
# The "Downloaded ..." message is printed in the output dir, not here
@@ -205,7 +205,7 @@ class CrawlerSection(Section):
on_windows = os.name == "nt"
return self.s.getboolean("windows_paths", fallback=on_windows)
def auth(self, authenticators: Dict[str, Authenticator]) -> Authenticator:
def auth(self, authenticators: dict[str, Authenticator]) -> Authenticator:
value = self.s.get("auth")
if value is None:
self.missing_value("auth")
@@ -262,7 +262,7 @@ class Crawler(ABC):
return self._output_dir
@staticmethod
async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]:
async def gather(awaitables: Sequence[Awaitable[Any]]) -> list[Any]:
"""
Similar to asyncio.gather. However, in the case of an exception, all
still running tasks are cancelled and the exception is rethrown.
@@ -394,7 +394,7 @@ class Crawler(ABC):
log.warn("Couldn't find or load old report")
return
seen: Set[PurePath] = set()
seen: set[PurePath] = set()
for known in sorted(self.prev_report.found_paths):
looking_at = list(reversed(known.parents)) + [known]
for path in looking_at:

View File

@@ -3,7 +3,7 @@ import http.cookies
import ssl
from datetime import datetime
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional, Tuple, cast
from typing import Any, Optional, cast
import aiohttp
import certifi
@@ -43,7 +43,7 @@ class HttpCrawler(Crawler):
self._http_timeout = section.http_timeout()
self._cookie_jar_path = self._output_dir.resolve(self.COOKIE_FILE)
self._shared_cookie_jar_paths: Optional[List[Path]] = None
self._shared_cookie_jar_paths: Optional[list[Path]] = None
self._shared_auth = shared_auth
self._output_dir.register_reserved(self.COOKIE_FILE)
@@ -98,7 +98,7 @@ class HttpCrawler(Crawler):
"""
raise RuntimeError("_authenticate() was called but crawler doesn't provide an implementation")
def share_cookies(self, shared: Dict[Authenticator, List[Path]]) -> None:
def share_cookies(self, shared: dict[Authenticator, list[Path]]) -> None:
if not self._shared_auth:
return
@@ -219,7 +219,7 @@ class HttpCrawler(Crawler):
etags[str(path)] = etag
self._output_dir.report.add_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY, etags)
async def _request_resource_version(self, resource_url: str) -> Tuple[Optional[str], Optional[datetime]]:
async def _request_resource_version(self, resource_url: str) -> tuple[Optional[str], Optional[datetime]]:
"""
Requests the ETag and Last-Modified headers of a resource via a HEAD request.
If no entity tag / modification date can be obtained, the according value will be None.

View File

@@ -1,5 +1,6 @@
import asyncio
from typing import Any, Callable, Optional
from collections.abc import Callable
from typing import Any, Optional
import aiohttp
@@ -15,9 +16,9 @@ def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Calla
try:
return await f(*args, **kwargs)
except aiohttp.ContentTypeError: # invalid content type
raise CrawlWarning("ILIAS returned an invalid content type")
raise CrawlWarning("ILIAS returned an invalid content type") from None
except aiohttp.TooManyRedirects:
raise CrawlWarning("Got stuck in a redirect loop")
raise CrawlWarning("Got stuck in a redirect loop") from None
except aiohttp.ClientPayloadError as e: # encoding or not enough bytes
last_exception = e
except aiohttp.ClientConnectionError as e: # e.g. timeout, disconnect, resolve failed, etc.

View File

@@ -297,9 +297,7 @@ class Links(Enum):
raise ValueError("Missing switch case")
def collection_as_one(self) -> bool:
if self == Links.FANCY:
return True
return False
return self == Links.FANCY
def extension(self) -> Optional[str]:
if self == Links.FANCY:
@@ -355,4 +353,4 @@ class Links(Enum):
return Links(string)
except ValueError:
options = [f"'{option.value}'" for option in Links]
raise ValueError(f"must be one of {', '.join(options)}")
raise ValueError(f"must be one of {', '.join(options)}") from None

View File

@@ -4,7 +4,7 @@ import os
import re
from collections.abc import Awaitable, Coroutine
from pathlib import PurePath
from typing import Any, Dict, List, Literal, Optional, Set, Union, cast
from typing import Any, Literal, Optional, cast
from urllib.parse import urljoin
import aiohttp
@@ -33,7 +33,7 @@ from .kit_ilias_html import (
)
from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int]
TargetType = str | int
class LoginTypeLocal:
@@ -49,7 +49,7 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
return base_url
def login(self) -> Union[Literal["shibboleth"], LoginTypeLocal]:
def login(self) -> Literal["shibboleth"] | LoginTypeLocal:
login_type = self.s.get("login_type")
if not login_type:
self.missing_value("login_type")
@@ -63,7 +63,7 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
self.invalid_value("login_type", login_type, "Should be <shibboleth | local>")
def tfa_auth(self, authenticators: Dict[str, Authenticator]) -> Optional[Authenticator]:
def tfa_auth(self, authenticators: dict[str, Authenticator]) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
@@ -110,7 +110,7 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
return self.s.getboolean("forums", fallback=False)
_DIRECTORY_PAGES: Set[IliasElementType] = {
_DIRECTORY_PAGES: set[IliasElementType] = {
IliasElementType.EXERCISE,
IliasElementType.EXERCISE_FILES,
IliasElementType.EXERCISE_OVERVIEW,
@@ -122,7 +122,7 @@ _DIRECTORY_PAGES: Set[IliasElementType] = {
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
}
_VIDEO_ELEMENTS: Set[IliasElementType] = {
_VIDEO_ELEMENTS: set[IliasElementType] = {
IliasElementType.MEDIACAST_VIDEO,
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO,
@@ -172,7 +172,7 @@ class IliasWebCrawler(HttpCrawler):
name: str,
section: IliasWebCrawlerSection,
config: Config,
authenticators: Dict[str, Authenticator],
authenticators: dict[str, Authenticator],
):
# Setting a main authenticator for cookie sharing
auth = section.auth(authenticators)
@@ -201,7 +201,7 @@ instance's greatest bottleneck.
self._links = section.links()
self._videos = section.videos()
self._forums = section.forums()
self._visited_urls: Dict[str, PurePath] = dict()
self._visited_urls: dict[str, PurePath] = dict()
async def _run(self) -> None:
if isinstance(self._target, int):
@@ -264,9 +264,9 @@ instance's greatest bottleneck.
expected_course_id: Optional[int] = None,
crawl_nested_courses: bool = False,
) -> None:
elements: List[IliasPageElement] = []
elements: list[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
description: List[BeautifulSoup] = []
description: list[BeautifulSoup] = []
@_iorepeat(3, "crawling folder")
async def gather_elements() -> None:
@@ -309,7 +309,7 @@ instance's greatest bottleneck.
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
tasks: list[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(cl.path, element, crawl_nested_courses):
tasks.append(asyncio.create_task(handle))
@@ -340,8 +340,7 @@ instance's greatest bottleneck.
)
return None
if element.type in _VIDEO_ELEMENTS:
if not self._videos:
if element.type in _VIDEO_ELEMENTS and not self._videos:
log.status(
"[bold bright_black]",
"Ignored",
@@ -522,8 +521,8 @@ instance's greatest bottleneck.
sink.file.write(rendered.encode("utf-8"))
sink.done()
async def _resolve_link_target(self, export_url: str) -> Union[BeautifulSoup, Literal["none"]]:
async def impl() -> Optional[Union[BeautifulSoup, Literal["none"]]]:
async def _resolve_link_target(self, export_url: str) -> BeautifulSoup | Literal["none"]:
async def impl() -> Optional[BeautifulSoup | Literal["none"]]:
async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers:
@@ -658,7 +657,7 @@ instance's greatest bottleneck.
def _previous_contained_opencast_videos(
self, element: IliasPageElement, element_path: PurePath
) -> List[PurePath]:
) -> list[PurePath]:
if not self.prev_report:
return []
custom_value = self.prev_report.get_custom_value(_get_video_cache_key(element))
@@ -714,7 +713,7 @@ instance's greatest bottleneck.
add_to_report([str(self._transformer.transform(dl.path))])
return
contained_video_paths: List[str] = []
contained_video_paths: list[str] = []
for stream_element in stream_elements:
video_path = dl.path.parent / stream_element.name
@@ -832,7 +831,7 @@ instance's greatest bottleneck.
elements = parse_ilias_forum_export(soupify(export))
tasks: List[Awaitable[None]] = []
tasks: list[Awaitable[None]] = []
for thread in elements:
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, element.url)))
@@ -842,7 +841,7 @@ instance's greatest bottleneck.
@anoncritical
@_iorepeat(3, "saving forum thread")
async def _download_forum_thread(
self, parent_path: PurePath, thread: Union[IliasForumThread, IliasPageElement], forum_url: str
self, parent_path: PurePath, thread: IliasForumThread | IliasPageElement, forum_url: str
) -> None:
path = parent_path / (_sanitize_path_name(thread.name) + ".html")
maybe_dl = await self.download(path, mtime=thread.mtime)
@@ -871,7 +870,7 @@ instance's greatest bottleneck.
@_iorepeat(3, "crawling learning module")
@anoncritical
async def _crawl_learning_module(self, element: IliasPageElement, cl: CrawlToken) -> None:
elements: List[IliasLearningModulePage] = []
elements: list[IliasLearningModulePage] = []
async with cl:
log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}")
@@ -891,7 +890,7 @@ instance's greatest bottleneck.
for index, lm_element in enumerate(elements):
lm_element.title = f"{index:02}_{lm_element.title}"
tasks: List[Awaitable[None]] = []
tasks: list[Awaitable[None]] = []
for index, elem in enumerate(elements):
prev_url = elements[index - 1].title if index > 0 else None
next_url = elements[index + 1].title if index < len(elements) - 1 else None
@@ -906,10 +905,10 @@ instance's greatest bottleneck.
self,
path: PurePath,
start_url: Optional[str],
dir: Union[Literal["left"], Literal["right"]],
dir: Literal["left"] | Literal["right"],
parent_element: IliasPageElement,
) -> List[IliasLearningModulePage]:
elements: List[IliasLearningModulePage] = []
) -> list[IliasLearningModulePage]:
elements: list[IliasLearningModulePage] = []
if not start_url:
return elements
@@ -923,10 +922,7 @@ instance's greatest bottleneck.
page = IliasPage(soup, parent_element)
if next := page.get_learning_module_data():
elements.append(next)
if dir == "left":
next_element_url = next.previous_url
else:
next_element_url = next.next_url
next_element_url = next.previous_url if dir == "left" else next.next_url
counter += 1
return elements
@@ -950,16 +946,10 @@ instance's greatest bottleneck.
if prev:
prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html"))
if prev_p:
prev = cast(str, os.path.relpath(prev_p, my_path.parent))
else:
prev = None
prev = cast(str, os.path.relpath(prev_p, my_path.parent)) if prev_p else None
if next:
next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html"))
if next_p:
next = cast(str, os.path.relpath(next_p, my_path.parent))
else:
next = None
next = cast(str, os.path.relpath(next_p, my_path.parent)) if next_p else None
async with maybe_dl as (bar, sink):
content = element.content
@@ -973,8 +963,7 @@ instance's greatest bottleneck.
"""
log.explain_topic("Internalizing images")
for elem in tag.find_all(recursive=True):
if elem.name == "img":
if src := elem.attrs.get("src", None):
if elem.name == "img" and (src := elem.attrs.get("src", None)):
url = urljoin(self._base_url, cast(str, src))
if not url.startswith(self._base_url):
continue
@@ -1025,7 +1014,7 @@ instance's greatest bottleneck.
)
return soup
async def _post(self, url: str, data: dict[str, Union[str, List[str]]]) -> bytes:
async def _post(self, url: str, data: dict[str, str | list[str]]) -> bytes:
form_data = aiohttp.FormData()
for key, val in data.items():
form_data.add_field(key, val)

View File

@@ -1,9 +1,10 @@
import json
import re
from collections.abc import Callable
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from enum import Enum
from typing import Callable, Dict, Optional, Union, cast
from typing import Optional, cast
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup, Tag
@@ -13,7 +14,7 @@ from PFERD.crawl.crawler import CrawlWarning
from PFERD.logging import log
from PFERD.utils import url_set_query_params
TargetType = Union[str, int]
TargetType = str | int
class TypeMatcher:
@@ -308,7 +309,7 @@ class IliasPageElement:
"""
# This checks whether we can reach a `:` without passing a `-`
if re.search(r"^[^-]+: ", meeting_name):
if re.search(r"^[^-]+: ", meeting_name): # noqa: SIM108
# Meeting name only contains date: "05. Jan 2000:"
split_delimiter = ":"
else:
@@ -331,7 +332,7 @@ class IliasPageElement:
@dataclass
class IliasDownloadForumData:
url: str
form_data: Dict[str, Union[str, list[str]]]
form_data: dict[str, str | list[str]]
empty: bool
@@ -433,9 +434,8 @@ class IliasPage:
for p in paragraphs:
if p.find_parent(class_=is_interesting_class):
continue
if "ilc_media_cont_MediaContainer" in p["class"]:
if "ilc_media_cont_MediaContainer" in p["class"] and (video := p.select_one("video")):
# We have an embedded video which should be downloaded by _find_mob_videos
if video := p.select_one("video"):
url, title = self._find_mob_video_url_title(video, p)
raw_html += '<div style="min-width: 100px; min-height: 100px; border: 1px solid black;'
raw_html += "display: flex; justify-content: center; align-items: center;"
@@ -794,7 +794,7 @@ class IliasPage:
is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None
if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER:
if is_paginated and self._page_type != IliasElementType.OPENCAST_VIDEO_FOLDER:
# We are in stage 2 - try to break pagination
return self._find_opencast_video_entries_paginated()
@@ -1164,6 +1164,9 @@ class IliasPage:
"""
found_titles = []
if None == "hey":
pass
outer_accordion_content: Optional[Tag] = None
parents: list[Tag] = list(tag.parents)
@@ -1302,10 +1305,7 @@ class IliasPage:
),
)
caption_container = caption_parent.find_next_sibling("div")
if caption_container:
description = caption_container.get_text().strip()
else:
description = None
description = caption_container.get_text().strip() if caption_container else None
if not typ:
_unexpected_html_warning()
@@ -1444,9 +1444,7 @@ class IliasPage:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False
return soup.select_one("#playerContainer") is not None
@staticmethod
def _find_date_in_text(text: str) -> Optional[datetime]:
@@ -1505,11 +1503,11 @@ def demangle_date(date_str: str, fail_silently: bool = False) -> Optional[dateti
# Normalize whitespace because users
date_str = re.sub(r"\s+", " ", date_str)
date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I)
date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I)
date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I)
date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, flags=re.I)
date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, flags=re.I)
date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, flags=re.I)
date_str = date_str.strip()
for german, english in zip(german_months, english_months):
for german, english in zip(german_months, english_months, strict=True):
date_str = date_str.replace(german, english)
# Remove trailing dots for abbreviations, e.g. "20. Apr. 2020" -> "20. Apr 2020"
date_str = date_str.replace(english + ".", english)

View File

@@ -1,4 +1,4 @@
from typing import Dict, Literal
from typing import Literal
from ...auth import Authenticator
from ...config import Config
@@ -26,7 +26,7 @@ class KitIliasWebCrawler(IliasWebCrawler):
name: str,
section: KitIliasWebCrawlerSection,
config: Config,
authenticators: Dict[str, Authenticator],
authenticators: dict[str, Authenticator],
):
super().__init__(name, section, config, authenticators)

View File

@@ -1,9 +1,11 @@
import os
import re
from collections.abc import Awaitable, Generator, Iterable
from dataclasses import dataclass
from datetime import datetime
from pathlib import PurePath
from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union, cast
from re import Pattern
from typing import Any, Optional, Union, cast
from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag
@@ -44,7 +46,7 @@ class KitIpdFile:
@dataclass
class KitIpdFolder:
name: str
entries: List[Union[KitIpdFile, "KitIpdFolder"]]
entries: list[Union[KitIpdFile, "KitIpdFolder"]]
def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}")
@@ -68,7 +70,7 @@ class KitIpdCrawler(HttpCrawler):
if not maybe_cl:
return
tasks: List[Awaitable[None]] = []
tasks: list[Awaitable[None]] = []
async with maybe_cl:
for item in await self._fetch_items():
@@ -120,9 +122,9 @@ class KitIpdCrawler(HttpCrawler):
async with maybe_dl as (bar, sink):
await self._stream_from_url(file.url, element_path, sink, bar)
async def _fetch_items(self) -> Iterable[Union[KitIpdFile, KitIpdFolder]]:
async def _fetch_items(self) -> Iterable[KitIpdFile | KitIpdFolder]:
page, url = await self.get_page()
elements: List[Tag] = self._find_file_links(page)
elements: list[Tag] = self._find_file_links(page)
# do not add unnecessary nesting for a single <h1> heading
drop_h1: bool = len(page.find_all(name="h1")) <= 1
@@ -151,7 +153,7 @@ class KitIpdCrawler(HttpCrawler):
name = os.path.basename(url)
return KitIpdFile(name, url)
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> list[Tag]:
def _find_file_links(self, tag: Tag | BeautifulSoup) -> list[Tag]:
return cast(list[Tag], tag.find_all(name="a", attrs={"href": self._file_regex}))
def _abs_url_from_link(self, url: str, link_tag: Tag) -> str:
@@ -172,7 +174,7 @@ class KitIpdCrawler(HttpCrawler):
self._add_etag_to_report(path, resp.headers.get("ETag"))
async def get_page(self) -> Tuple[BeautifulSoup, str]:
async def get_page(self) -> tuple[BeautifulSoup, str]:
async with self.session.get(self._url) as request:
# The web page for Algorithmen für Routenplanung contains some
# weird comments that beautifulsoup doesn't parse correctly. This

View File

@@ -1,5 +1,5 @@
from collections.abc import Iterator
from pathlib import PurePath
from typing import Iterator, Set
from .logging import log
from .utils import fmt_path
@@ -43,7 +43,7 @@ class Deduplicator:
def __init__(self, windows_paths: bool) -> None:
self._windows_paths = windows_paths
self._known: Set[PurePath] = set()
self._known: set[PurePath] = set()
def _add(self, path: PurePath) -> None:
self._known.add(path)

View File

@@ -1,8 +1,9 @@
import asyncio
import time
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncIterator, Optional
from typing import Optional
@dataclass

View File

@@ -1,8 +1,9 @@
import asyncio
import sys
import traceback
from collections.abc import AsyncIterator, Iterator
from contextlib import AbstractContextManager, asynccontextmanager, contextmanager
from typing import AsyncIterator, Iterator, List, Optional
from typing import Optional
from rich.console import Console, Group
from rich.live import Live
@@ -60,7 +61,7 @@ class Log:
self._showing_progress = False
self._progress_suspended = False
self._lock = asyncio.Lock()
self._lines: List[str] = []
self._lines: list[str] = []
# Whether different parts of the output are enabled or disabled
self.output_explain = False

View File

@@ -4,12 +4,13 @@ import os
import random
import shutil
import string
from contextlib import contextmanager
from collections.abc import Iterator
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from pathlib import Path, PurePath
from typing import BinaryIO, Iterator, Optional, Tuple
from typing import BinaryIO, Optional
from .logging import log
from .report import Report, ReportLoadError
@@ -35,7 +36,7 @@ class Redownload(Enum):
try:
return Redownload(string)
except ValueError:
raise ValueError("must be one of 'never', 'never-smart', 'always', 'always-smart'")
raise ValueError("must be one of 'never', 'never-smart', 'always', 'always-smart'") from None
class OnConflict(Enum):
@@ -53,7 +54,7 @@ class OnConflict(Enum):
raise ValueError(
"must be one of 'prompt', 'local-first',"
" 'remote-first', 'no-delete', 'no-delete-prompt-overwrite'"
)
) from None
@dataclass
@@ -177,8 +178,8 @@ class OutputDirectory:
try:
self._root.mkdir(parents=True, exist_ok=True)
except OSError:
raise OutputDirError("Failed to create base directory")
except OSError as e:
raise OutputDirError("Failed to create base directory") from e
def register_reserved(self, path: PurePath) -> None:
self._report.mark_reserved(path)
@@ -358,7 +359,7 @@ class OutputDirectory:
async def _create_tmp_file(
self,
local_path: Path,
) -> Tuple[Path, BinaryIO]:
) -> tuple[Path, BinaryIO]:
"""
May raise an OutputDirError.
"""
@@ -509,10 +510,8 @@ class OutputDirectory:
await self._cleanup(child, pure_child)
if delete_self:
try:
with suppress(OSError):
path.rmdir()
except OSError:
pass
async def _cleanup_file(self, path: Path, pure: PurePath) -> None:
if self._report.is_marked(pure):

View File

@@ -1,5 +1,5 @@
from pathlib import Path, PurePath
from typing import Dict, List, Optional
from typing import Optional
from rich.markup import escape
@@ -15,7 +15,7 @@ class PferdLoadError(Exception):
class Pferd:
def __init__(self, config: Config, cli_crawlers: Optional[List[str]], cli_skips: Optional[List[str]]):
def __init__(self, config: Config, cli_crawlers: Optional[list[str]], cli_skips: Optional[list[str]]):
"""
May throw PferdLoadError.
"""
@@ -23,10 +23,10 @@ class Pferd:
self._config = config
self._crawlers_to_run = self._find_crawlers_to_run(config, cli_crawlers, cli_skips)
self._authenticators: Dict[str, Authenticator] = {}
self._crawlers: Dict[str, Crawler] = {}
self._authenticators: dict[str, Authenticator] = {}
self._crawlers: dict[str, Crawler] = {}
def _find_config_crawlers(self, config: Config) -> List[str]:
def _find_config_crawlers(self, config: Config) -> list[str]:
crawl_sections = []
for name, section in config.crawl_sections():
@@ -37,7 +37,7 @@ class Pferd:
return crawl_sections
def _find_cli_crawlers(self, config: Config, cli_crawlers: List[str]) -> List[str]:
def _find_cli_crawlers(self, config: Config, cli_crawlers: list[str]) -> list[str]:
if len(cli_crawlers) != len(set(cli_crawlers)):
raise PferdLoadError("Some crawlers were selected multiple times")
@@ -68,12 +68,12 @@ class Pferd:
def _find_crawlers_to_run(
self,
config: Config,
cli_crawlers: Optional[List[str]],
cli_skips: Optional[List[str]],
) -> List[str]:
cli_crawlers: Optional[list[str]],
cli_skips: Optional[list[str]],
) -> list[str]:
log.explain_topic("Deciding which crawlers to run")
crawlers: List[str]
crawlers: list[str]
if cli_crawlers is None:
log.explain("No crawlers specified on CLI")
log.explain("Running crawlers specified in config")
@@ -104,7 +104,7 @@ class Pferd:
def _load_crawlers(self) -> None:
# Cookie sharing
kit_ilias_web_paths: Dict[Authenticator, List[Path]] = {}
kit_ilias_web_paths: dict[Authenticator, list[Path]] = {}
for name, section in self._config.crawl_sections():
log.print(f"[bold bright_cyan]Loading[/] {escape(name)}")
@@ -117,8 +117,7 @@ class Pferd:
crawler = crawler_constructor(name, section, self._config, self._authenticators)
self._crawlers[name] = crawler
if self._config.default_section.share_cookies():
if isinstance(crawler, KitIliasWebCrawler):
if self._config.default_section.share_cookies() and isinstance(crawler, KitIliasWebCrawler):
crawler.share_cookies(kit_ilias_web_paths)
def debug_transforms(self) -> None:

View File

@@ -1,6 +1,6 @@
import json
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional, Set
from typing import Any, Optional
class ReportLoadError(Exception):
@@ -42,32 +42,32 @@ class Report:
def __init__(self) -> None:
# Paths found by the crawler, untransformed
self.found_paths: Set[PurePath] = set()
self.found_paths: set[PurePath] = set()
# Files reserved for metadata files (e. g. the report file or cookies)
# that can't be overwritten by user transforms and won't be cleaned up
# at the end.
self.reserved_files: Set[PurePath] = set()
self.reserved_files: set[PurePath] = set()
# Files found by the crawler, transformed. Only includes files that
# were downloaded (or a download was attempted)
self.known_files: Set[PurePath] = set()
self.known_files: set[PurePath] = set()
self.added_files: Set[PurePath] = set()
self.changed_files: Set[PurePath] = set()
self.deleted_files: Set[PurePath] = set()
self.added_files: set[PurePath] = set()
self.changed_files: set[PurePath] = set()
self.deleted_files: set[PurePath] = set()
# Files that should have been deleted by the cleanup but weren't
self.not_deleted_files: Set[PurePath] = set()
self.not_deleted_files: set[PurePath] = set()
# Custom crawler-specific data
self.custom: Dict[str, Any] = dict()
self.custom: dict[str, Any] = dict()
# Encountered errors and warnings
self.encountered_warnings: List[str] = []
self.encountered_errors: List[str] = []
self.encountered_warnings: list[str] = []
self.encountered_errors: list[str] = []
@staticmethod
def _get_list_of_strs(data: Dict[str, Any], key: str) -> List[str]:
def _get_list_of_strs(data: dict[str, Any], key: str) -> list[str]:
result: Any = data.get(key, [])
if not isinstance(result, list):
@@ -80,8 +80,8 @@ class Report:
return result
@staticmethod
def _get_str_dictionary(data: Dict[str, Any], key: str) -> Dict[str, Any]:
result: Dict[str, Any] = data.get(key, {})
def _get_str_dictionary(data: dict[str, Any], key: str) -> dict[str, Any]:
result: dict[str, Any] = data.get(key, {})
if not isinstance(result, dict):
raise ReportLoadError(f"Incorrect format: {key!r} is not a dictionary")
@@ -170,7 +170,7 @@ class Report:
self.known_files.add(path)
@property
def marked(self) -> Set[PurePath]:
def marked(self) -> set[PurePath]:
return self.known_files | self.reserved_files
def is_marked(self, path: PurePath) -> bool:

View File

@@ -1,10 +1,12 @@
import ast
import contextlib
import re
from abc import ABC, abstractmethod
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from enum import Enum
from pathlib import PurePath
from typing import Callable, Dict, List, Optional, Sequence, TypeVar, Union
from typing import Optional, TypeVar
from .logging import log
from .utils import fmt_path, str_path
@@ -23,7 +25,7 @@ class Empty:
pass
RightSide = Union[str, Ignore, Empty]
RightSide = str | Ignore | Empty
@dataclass
@@ -35,7 +37,7 @@ class Ignored:
pass
TransformResult = Optional[Union[Transformed, Ignored]]
TransformResult = Transformed | Ignored | None
@dataclass
@@ -47,7 +49,7 @@ class Rule:
right: RightSide
right_index: int
def right_result(self, path: PurePath) -> Union[str, Transformed, Ignored]:
def right_result(self, path: PurePath) -> str | Transformed | Ignored:
if isinstance(self.right, str):
return self.right
elif isinstance(self.right, Ignore):
@@ -93,24 +95,20 @@ class ExactReTf(Transformation):
# since elements of "match.groups()" can be None, mypy is wrong.
groups: Sequence[Optional[str]] = [match[0]] + list(match.groups())
locals_dir: Dict[str, Union[str, int, float]] = {}
locals_dir: dict[str, str | int | float] = {}
for i, group in enumerate(groups):
if group is None:
continue
locals_dir[f"g{i}"] = group
try:
with contextlib.suppress(ValueError):
locals_dir[f"i{i}"] = int(group)
except ValueError:
pass
try:
with contextlib.suppress(ValueError):
locals_dir[f"f{i}"] = float(group)
except ValueError:
pass
named_groups: Dict[str, str] = match.groupdict()
named_groups: dict[str, str] = match.groupdict()
for name, capture in named_groups.items():
locals_dir[name] = capture
@@ -228,7 +226,7 @@ class Line:
self.expect(string)
return value
def one_of(self, parsers: List[Callable[[], T]], description: str) -> T:
def one_of(self, parsers: list[Callable[[], T]], description: str) -> T:
for parser in parsers:
index = self.index
try:
@@ -315,7 +313,7 @@ def parse_left(line: Line) -> str:
return parse_str(line)
def parse_right(line: Line) -> Union[str, Ignore]:
def parse_right(line: Line) -> str | Ignore:
c = line.peek()
if c in QUOTATION_MARKS:
return parse_quoted_str(line)

View File

@@ -3,10 +3,11 @@ import getpass
import sys
import threading
from abc import ABC, abstractmethod
from collections.abc import Callable
from contextlib import AsyncExitStack
from pathlib import Path, PurePath
from types import TracebackType
from typing import Any, Callable, Dict, Generic, Optional, Type, TypeVar
from typing import Any, Generic, Optional, TypeVar
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit
import bs4
@@ -79,7 +80,7 @@ def url_set_query_param(url: str, param: str, value: str) -> str:
return urlunsplit((scheme, netloc, path, new_query_string, fragment))
def url_set_query_params(url: str, params: Dict[str, str]) -> str:
def url_set_query_params(url: str, params: dict[str, str]) -> str:
"""
Sets multiple query parameters in an url, overwriting existing ones.
"""
@@ -132,7 +133,7 @@ class ReusableAsyncContextManager(ABC, Generic[T]):
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_type: Optional[type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]: