Compare commits

...

7 Commits

Author SHA1 Message Date
I-Al-Istannen
45e25db5ad Switch to uv 2025-10-19 16:21:23 +02:00
I-Al-Istannen
ef7d66c5af Fix some typing errors
It seems like the type hints have gotten better :)
2025-10-19 16:10:18 +02:00
I-Al-Istannen
5646e933fd Ignore reformat in git blame 2025-10-19 15:48:29 +02:00
I-Al-Istannen
6e563134b2 Fix ruff errors 2025-10-19 15:48:16 +02:00
I-Al-Istannen
2cf0e060ed Reformat and switch to ruff 2025-10-19 15:45:49 +02:00
I-Al-Istannen
ee4625be78 Hardcode max line length in scripts/check 2025-10-19 15:08:29 +02:00
I-Al-Istannen
f6c713d621 Fix mypy errors 2025-10-19 15:08:21 +02:00
42 changed files with 1887 additions and 823 deletions

1
.git-blame-ignore-revs Normal file
View File

@@ -0,0 +1 @@
2cf0e060ed126537dd993896b6aa793e2a6b9e80

View File

@@ -18,19 +18,13 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
python-version: ${{ matrix.python }}
- name: Set up project
if: matrix.os != 'windows-latest'
run: ./scripts/setup
- name: Set up project on windows
if: matrix.os == 'windows-latest'
# For some reason, `pip install --upgrade pip` doesn't work on
# 'windows-latest'. The installed pip version works fine however.
run: ./scripts/setup --no-pip
run: uv sync
- name: Run checks
run: |

21
DEV.md
View File

@@ -9,30 +9,25 @@ particular [this][ppug-1] and [this][ppug-2] guide).
## Setting up a dev environment
The use of [venv][venv] is recommended. To initially set up a development
environment, run these commands in the same directory as this file:
The use of [venv][venv] and [uv][uv] is recommended. To initially set up a
development environment, run these commands in the same directory as this file:
```
$ python -m venv .venv
$ uv sync
$ . .venv/bin/activate
$ ./scripts/setup
```
The setup script installs a few required dependencies and tools. It also
installs PFERD via `pip install --editable .`, which means that you can just run
`pferd` as if it was installed normally. Since PFERD was installed with
`--editable`, there is no need to re-run `pip install` when the source code is
changed.
If you get any errors because pip can't update itself, try running
`./scripts/setup --no-pip` instead of `./scripts/setup`.
This install all required dependencies and tools. It also installs PFERD as
*editable*, which means that you can just run `pferd` as if it was installed
normally. Since PFERD was installed with `--editable`, there is no need to
re-run `uv sync` when the source code is changed.
For more details, see [this part of the Python Tutorial][venv-tut] and
[this section on "development mode"][ppug-dev].
[venv]: <https://docs.python.org/3/library/venv.html> "venv - Creation of virtual environments"
[venv-tut]: <https://docs.python.org/3/tutorial/venv.html> "12. Virtual Environments and Packages"
[ppug-dev]: <https://packaging.python.org/guides/distributing-packages-using-setuptools/#working-in-development-mode> "Working in “development mode”"
[uv]: <https://docs.astral.sh/uv/> "uv - An extremely fast Python package and project manager"
## Checking and formatting the code

View File

@@ -1,5 +1,5 @@
from collections.abc import Callable
from configparser import SectionProxy
from typing import Callable, Dict
from ..config import Config
from .authenticator import Authenticator, AuthError, AuthLoadError, AuthSection # noqa: F401
@@ -9,21 +9,19 @@ from .pass_ import PassAuthenticator, PassAuthSection
from .simple import SimpleAuthenticator, SimpleAuthSection
from .tfa import TfaAuthenticator
AuthConstructor = Callable[[
str, # Name (without the "auth:" prefix)
SectionProxy, # Authenticator's section of global config
Config, # Global config
], Authenticator]
AuthConstructor = Callable[
[
str, # Name (without the "auth:" prefix)
SectionProxy, # Authenticator's section of global config
Config, # Global config
],
Authenticator,
]
AUTHENTICATORS: Dict[str, AuthConstructor] = {
"credential-file": lambda n, s, c:
CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c),
"keyring": lambda n, s, c:
KeyringAuthenticator(n, KeyringAuthSection(s)),
"pass": lambda n, s, c:
PassAuthenticator(n, PassAuthSection(s)),
"simple": lambda n, s, c:
SimpleAuthenticator(n, SimpleAuthSection(s)),
"tfa": lambda n, s, c:
TfaAuthenticator(n),
AUTHENTICATORS: dict[str, AuthConstructor] = {
"credential-file": lambda n, s, c: CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c),
"keyring": lambda n, s, c: KeyringAuthenticator(n, KeyringAuthSection(s)),
"pass": lambda n, s, c: PassAuthenticator(n, PassAuthSection(s)),
"simple": lambda n, s, c: SimpleAuthenticator(n, SimpleAuthSection(s)),
"tfa": lambda n, s, c: TfaAuthenticator(n),
}

View File

@@ -1,5 +1,4 @@
from abc import ABC, abstractmethod
from typing import Tuple
from ..config import Section
@@ -35,7 +34,7 @@ class Authenticator(ABC):
self.name = name
@abstractmethod
async def credentials(self) -> Tuple[str, str]:
async def credentials(self) -> tuple[str, str]:
pass
async def username(self) -> str:

View File

@@ -1,5 +1,4 @@
from pathlib import Path
from typing import Tuple
from ..config import Config
from ..utils import fmt_real_path
@@ -23,7 +22,9 @@ class CredentialFileAuthenticator(Authenticator):
with open(path, encoding="utf-8") as f:
lines = list(f)
except UnicodeDecodeError:
raise AuthLoadError(f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8")
raise AuthLoadError(
f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8"
) from None
except OSError as e:
raise AuthLoadError(f"No credential file at {fmt_real_path(path)}") from e
@@ -42,5 +43,5 @@ class CredentialFileAuthenticator(Authenticator):
self._username = uline[9:]
self._password = pline[9:]
async def credentials(self) -> Tuple[str, str]:
async def credentials(self) -> tuple[str, str]:
return self._username, self._password

View File

@@ -1,4 +1,4 @@
from typing import Optional, Tuple, cast
from typing import Optional
import keyring
@@ -13,11 +13,10 @@ class KeyringAuthSection(AuthSection):
return self.s.get("username")
def keyring_name(self) -> str:
return cast(str, self.s.get("keyring_name", fallback=NAME))
return self.s.get("keyring_name", fallback=NAME)
class KeyringAuthenticator(Authenticator):
def __init__(self, name: str, section: KeyringAuthSection) -> None:
super().__init__(name)
@@ -28,7 +27,7 @@ class KeyringAuthenticator(Authenticator):
self._password_invalidated = False
self._username_fixed = section.username() is not None
async def credentials(self) -> Tuple[str, str]:
async def credentials(self) -> tuple[str, str]:
# Request the username
if self._username is None:
async with log.exclusive_output():

View File

@@ -1,6 +1,5 @@
import re
import subprocess
from typing import List, Tuple
from ..logging import log
from .authenticator import Authenticator, AuthError, AuthSection
@@ -12,11 +11,11 @@ class PassAuthSection(AuthSection):
self.missing_value("passname")
return value
def username_prefixes(self) -> List[str]:
def username_prefixes(self) -> list[str]:
value = self.s.get("username_prefixes", "login,username,user")
return [prefix.lower() for prefix in value.split(",")]
def password_prefixes(self) -> List[str]:
def password_prefixes(self) -> list[str]:
value = self.s.get("password_prefixes", "password,pass,secret")
return [prefix.lower() for prefix in value.split(",")]
@@ -31,14 +30,14 @@ class PassAuthenticator(Authenticator):
self._username_prefixes = section.username_prefixes()
self._password_prefixes = section.password_prefixes()
async def credentials(self) -> Tuple[str, str]:
async def credentials(self) -> tuple[str, str]:
log.explain_topic("Obtaining credentials from pass")
try:
log.explain(f"Calling 'pass show {self._passname}'")
result = subprocess.check_output(["pass", "show", self._passname], text=True)
except subprocess.CalledProcessError as e:
raise AuthError(f"Failed to get password info from {self._passname}: {e}")
raise AuthError(f"Failed to get password info from {self._passname}: {e}") from e
prefixed = {}
unprefixed = []

View File

@@ -1,4 +1,4 @@
from typing import Optional, Tuple
from typing import Optional
from ..logging import log
from ..utils import agetpass, ainput
@@ -23,7 +23,7 @@ class SimpleAuthenticator(Authenticator):
self._username_fixed = self.username is not None
self._password_fixed = self.password is not None
async def credentials(self) -> Tuple[str, str]:
async def credentials(self) -> tuple[str, str]:
if self._username is not None and self._password is not None:
return self._username, self._password

View File

@@ -1,5 +1,3 @@
from typing import Tuple
from ..logging import log
from ..utils import ainput
from .authenticator import Authenticator, AuthError
@@ -17,7 +15,7 @@ class TfaAuthenticator(Authenticator):
code = await ainput("TFA code: ")
return code
async def credentials(self) -> Tuple[str, str]:
async def credentials(self) -> tuple[str, str]:
raise AuthError("TFA authenticator does not support usernames")
def invalidate_username(self) -> None:

View File

@@ -21,23 +21,20 @@ GROUP.add_argument(
"--base-url",
type=str,
metavar="BASE_URL",
help="The base url of the ilias instance"
help="The base url of the ilias instance",
)
GROUP.add_argument(
"--client-id",
type=str,
metavar="CLIENT_ID",
help="The client id of the ilias instance"
help="The client id of the ilias instance",
)
configure_common_group_args(GROUP)
def load(
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
def load(args: argparse.Namespace, parser: configparser.ConfigParser) -> None:
log.explain(f"Creating config for command '{COMMAND_NAME}'")
parser["crawl:ilias"] = {}

View File

@@ -21,8 +21,8 @@ configure_common_group_args(GROUP)
def load(
args: argparse.Namespace,
parser: configparser.ConfigParser,
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
log.explain(f"Creating config for command '{COMMAND_NAME}'")

View File

@@ -18,25 +18,25 @@ GROUP.add_argument(
"--link-regex",
type=str,
metavar="REGEX",
help="href-matching regex to identify downloadable files"
help="href-matching regex to identify downloadable files",
)
GROUP.add_argument(
"target",
type=str,
metavar="TARGET",
help="url to crawl"
help="url to crawl",
)
GROUP.add_argument(
"output",
type=Path,
metavar="OUTPUT",
help="output directory"
help="output directory",
)
def load(
args: argparse.Namespace,
parser: configparser.ConfigParser,
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
log.explain("Creating config for command 'kit-ipd'")

View File

@@ -18,37 +18,37 @@ GROUP.add_argument(
"target",
type=Path,
metavar="TARGET",
help="directory to crawl"
help="directory to crawl",
)
GROUP.add_argument(
"output",
type=Path,
metavar="OUTPUT",
help="output directory"
help="output directory",
)
GROUP.add_argument(
"--crawl-delay",
type=float,
metavar="SECONDS",
help="artificial delay to simulate for crawl requests"
help="artificial delay to simulate for crawl requests",
)
GROUP.add_argument(
"--download-delay",
type=float,
metavar="SECONDS",
help="artificial delay to simulate for download requests"
help="artificial delay to simulate for download requests",
)
GROUP.add_argument(
"--download-speed",
type=int,
metavar="BYTES_PER_SECOND",
help="download speed to simulate"
help="download speed to simulate",
)
def load(
args: argparse.Namespace,
parser: configparser.ConfigParser,
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
log.explain("Creating config for command 'local'")

View File

@@ -12,58 +12,60 @@ def configure_common_group_args(group: argparse._ArgumentGroup) -> None:
"target",
type=str,
metavar="TARGET",
help="course id, 'desktop', or ILIAS URL to crawl"
help="course id, 'desktop', or ILIAS URL to crawl",
)
group.add_argument(
"output",
type=Path,
metavar="OUTPUT",
help="output directory"
help="output directory",
)
group.add_argument(
"--username", "-u",
"--username",
"-u",
type=str,
metavar="USERNAME",
help="user name for authentication"
help="user name for authentication",
)
group.add_argument(
"--keyring",
action=BooleanOptionalAction,
help="use the system keyring to store and retrieve passwords"
help="use the system keyring to store and retrieve passwords",
)
group.add_argument(
"--credential-file",
type=Path,
metavar="PATH",
help="read username and password from a credential file"
help="read username and password from a credential file",
)
group.add_argument(
"--links",
type=show_value_error(Links.from_string),
metavar="OPTION",
help="how to represent external links"
help="how to represent external links",
)
group.add_argument(
"--link-redirect-delay",
type=int,
metavar="SECONDS",
help="time before 'fancy' links redirect to to their target (-1 to disable)"
help="time before 'fancy' links redirect to to their target (-1 to disable)",
)
group.add_argument(
"--videos",
action=BooleanOptionalAction,
help="crawl and download videos"
help="crawl and download videos",
)
group.add_argument(
"--forums",
action=BooleanOptionalAction,
help="crawl and download forum posts"
help="crawl and download forum posts",
)
group.add_argument(
"--http-timeout", "-t",
"--http-timeout",
"-t",
type=float,
metavar="SECONDS",
help="timeout for all HTTP requests"
help="timeout for all HTTP requests",
)

View File

@@ -1,8 +1,9 @@
import argparse
import configparser
from argparse import ArgumentTypeError
from collections.abc import Callable, Sequence
from pathlib import Path
from typing import Any, Callable, List, Optional, Sequence, Union
from typing import Any, Optional
from ..output_dir import OnConflict, Redownload
from ..version import NAME, VERSION
@@ -15,15 +16,15 @@ class ParserLoadError(Exception):
# TODO Replace with argparse version when updating to 3.9?
class BooleanOptionalAction(argparse.Action):
def __init__(
self,
option_strings: List[str],
dest: Any,
default: Any = None,
type: Any = None,
choices: Any = None,
required: Any = False,
help: Any = None,
metavar: Any = None,
self,
option_strings: list[str],
dest: Any,
default: Any = None,
type: Any = None,
choices: Any = None,
required: Any = False,
help: Any = None,
metavar: Any = None,
):
if len(option_strings) != 1:
raise ValueError("There must be exactly one option string")
@@ -48,11 +49,11 @@ class BooleanOptionalAction(argparse.Action):
)
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Union[str, Sequence[Any], None],
option_string: Optional[str] = None,
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: str | Sequence[Any] | None,
option_string: Optional[str] = None,
) -> None:
if option_string and option_string in self.option_strings:
value = not option_string.startswith("--no-")
@@ -67,11 +68,13 @@ def show_value_error(inner: Callable[[str], Any]) -> Callable[[str], Any]:
Some validation functions (like the from_string in our enums) raise a ValueError.
Argparse only pretty-prints ArgumentTypeErrors though, so we need to wrap our ValueErrors.
"""
def wrapper(input: str) -> Any:
try:
return inner(input)
except ValueError as e:
raise ArgumentTypeError(e)
raise ArgumentTypeError(e) from e
return wrapper
@@ -81,52 +84,57 @@ CRAWLER_PARSER_GROUP = CRAWLER_PARSER.add_argument_group(
description="arguments common to all crawlers",
)
CRAWLER_PARSER_GROUP.add_argument(
"--redownload", "-r",
"--redownload",
"-r",
type=show_value_error(Redownload.from_string),
metavar="OPTION",
help="when to download a file that's already present locally"
help="when to download a file that's already present locally",
)
CRAWLER_PARSER_GROUP.add_argument(
"--on-conflict",
type=show_value_error(OnConflict.from_string),
metavar="OPTION",
help="what to do when local and remote files or directories differ"
help="what to do when local and remote files or directories differ",
)
CRAWLER_PARSER_GROUP.add_argument(
"--transform", "-T",
"--transform",
"-T",
action="append",
type=str,
metavar="RULE",
help="add a single transformation rule. Can be specified multiple times"
help="add a single transformation rule. Can be specified multiple times",
)
CRAWLER_PARSER_GROUP.add_argument(
"--tasks", "-n",
"--tasks",
"-n",
type=int,
metavar="N",
help="maximum number of concurrent tasks (crawling, downloading)"
help="maximum number of concurrent tasks (crawling, downloading)",
)
CRAWLER_PARSER_GROUP.add_argument(
"--downloads", "-N",
"--downloads",
"-N",
type=int,
metavar="N",
help="maximum number of tasks that may download data at the same time"
help="maximum number of tasks that may download data at the same time",
)
CRAWLER_PARSER_GROUP.add_argument(
"--task-delay", "-d",
"--task-delay",
"-d",
type=float,
metavar="SECONDS",
help="time the crawler should wait between subsequent tasks"
help="time the crawler should wait between subsequent tasks",
)
CRAWLER_PARSER_GROUP.add_argument(
"--windows-paths",
action=BooleanOptionalAction,
help="whether to repair invalid paths on windows"
help="whether to repair invalid paths on windows",
)
def load_crawler(
args: argparse.Namespace,
section: configparser.SectionProxy,
args: argparse.Namespace,
section: configparser.SectionProxy,
) -> None:
if args.redownload is not None:
section["redownload"] = args.redownload.value
@@ -152,79 +160,79 @@ PARSER.add_argument(
version=f"{NAME} {VERSION} (https://github.com/Garmelon/PFERD)",
)
PARSER.add_argument(
"--config", "-c",
"--config",
"-c",
type=Path,
metavar="PATH",
help="custom config file"
help="custom config file",
)
PARSER.add_argument(
"--dump-config",
action="store_true",
help="dump current configuration to the default config path and exit"
help="dump current configuration to the default config path and exit",
)
PARSER.add_argument(
"--dump-config-to",
metavar="PATH",
help="dump current configuration to a file and exit."
" Use '-' as path to print to stdout instead"
help="dump current configuration to a file and exit. Use '-' as path to print to stdout instead",
)
PARSER.add_argument(
"--debug-transforms",
action="store_true",
help="apply transform rules to files of previous run"
help="apply transform rules to files of previous run",
)
PARSER.add_argument(
"--crawler", "-C",
"--crawler",
"-C",
action="append",
type=str,
metavar="NAME",
help="only execute a single crawler."
" Can be specified multiple times to execute multiple crawlers"
help="only execute a single crawler. Can be specified multiple times to execute multiple crawlers",
)
PARSER.add_argument(
"--skip", "-S",
"--skip",
"-S",
action="append",
type=str,
metavar="NAME",
help="don't execute this particular crawler."
" Can be specified multiple times to skip multiple crawlers"
help="don't execute this particular crawler. Can be specified multiple times to skip multiple crawlers",
)
PARSER.add_argument(
"--working-dir",
type=Path,
metavar="PATH",
help="custom working directory"
help="custom working directory",
)
PARSER.add_argument(
"--explain",
action=BooleanOptionalAction,
help="log and explain in detail what PFERD is doing"
help="log and explain in detail what PFERD is doing",
)
PARSER.add_argument(
"--status",
action=BooleanOptionalAction,
help="print status updates while PFERD is crawling"
help="print status updates while PFERD is crawling",
)
PARSER.add_argument(
"--report",
action=BooleanOptionalAction,
help="print a report of all local changes before exiting"
help="print a report of all local changes before exiting",
)
PARSER.add_argument(
"--share-cookies",
action=BooleanOptionalAction,
help="whether crawlers should share cookies where applicable"
help="whether crawlers should share cookies where applicable",
)
PARSER.add_argument(
"--show-not-deleted",
action=BooleanOptionalAction,
help="print messages in status and report when PFERD did not delete a local only file"
help="print messages in status and report when PFERD did not delete a local only file",
)
def load_default_section(
args: argparse.Namespace,
parser: configparser.ConfigParser,
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
section = parser[parser.default_section]

View File

@@ -3,7 +3,7 @@ import os
import sys
from configparser import ConfigParser, SectionProxy
from pathlib import Path
from typing import Any, List, NoReturn, Optional, Tuple
from typing import Any, NoReturn, Optional
from rich.markup import escape
@@ -53,10 +53,10 @@ class Section:
raise ConfigOptionError(self.s.name, key, desc)
def invalid_value(
self,
key: str,
value: Any,
reason: Optional[str],
self,
key: str,
value: Any,
reason: Optional[str],
) -> NoReturn:
if reason is None:
self.error(key, f"Invalid value {value!r}")
@@ -126,13 +126,13 @@ class Config:
with open(path, encoding="utf-8") as f:
parser.read_file(f, source=str(path))
except FileNotFoundError:
raise ConfigLoadError(path, "File does not exist")
raise ConfigLoadError(path, "File does not exist") from None
except IsADirectoryError:
raise ConfigLoadError(path, "That's a directory, not a file")
raise ConfigLoadError(path, "That's a directory, not a file") from None
except PermissionError:
raise ConfigLoadError(path, "Insufficient permissions")
raise ConfigLoadError(path, "Insufficient permissions") from None
except UnicodeDecodeError:
raise ConfigLoadError(path, "File is not encoded using UTF-8")
raise ConfigLoadError(path, "File is not encoded using UTF-8") from None
def dump(self, path: Optional[Path] = None) -> None:
"""
@@ -150,8 +150,8 @@ class Config:
try:
path.parent.mkdir(parents=True, exist_ok=True)
except PermissionError:
raise ConfigDumpError(path, "Could not create parent directory")
except PermissionError as e:
raise ConfigDumpError(path, "Could not create parent directory") from e
try:
# Ensuring we don't accidentally overwrite any existing files by
@@ -167,16 +167,16 @@ class Config:
with open(path, "w", encoding="utf-8") as f:
self._parser.write(f)
else:
raise ConfigDumpError(path, "File already exists")
raise ConfigDumpError(path, "File already exists") from None
except IsADirectoryError:
raise ConfigDumpError(path, "That's a directory, not a file")
except PermissionError:
raise ConfigDumpError(path, "Insufficient permissions")
raise ConfigDumpError(path, "That's a directory, not a file") from None
except PermissionError as e:
raise ConfigDumpError(path, "Insufficient permissions") from e
def dump_to_stdout(self) -> None:
self._parser.write(sys.stdout)
def crawl_sections(self) -> List[Tuple[str, SectionProxy]]:
def crawl_sections(self) -> list[tuple[str, SectionProxy]]:
result = []
for name, proxy in self._parser.items():
if name.startswith("crawl:"):
@@ -184,7 +184,7 @@ class Config:
return result
def auth_sections(self) -> List[Tuple[str, SectionProxy]]:
def auth_sections(self) -> list[tuple[str, SectionProxy]]:
result = []
for name, proxy in self._parser.items():
if name.startswith("auth:"):

View File

@@ -1,5 +1,5 @@
from collections.abc import Callable
from configparser import SectionProxy
from typing import Callable, Dict
from ..auth import Authenticator
from ..config import Config
@@ -8,20 +8,19 @@ from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler,
from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection
from .local_crawler import LocalCrawler, LocalCrawlerSection
CrawlerConstructor = Callable[[
str, # Name (without the "crawl:" prefix)
SectionProxy, # Crawler's section of global config
Config, # Global config
Dict[str, Authenticator], # Loaded authenticators by name
], Crawler]
CrawlerConstructor = Callable[
[
str, # Name (without the "crawl:" prefix)
SectionProxy, # Crawler's section of global config
Config, # Global config
dict[str, Authenticator], # Loaded authenticators by name
],
Crawler,
]
CRAWLERS: Dict[str, CrawlerConstructor] = {
"local": lambda n, s, c, a:
LocalCrawler(n, LocalCrawlerSection(s), c),
"ilias-web": lambda n, s, c, a:
IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a),
"kit-ilias-web": lambda n, s, c, a:
KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a),
"kit-ipd": lambda n, s, c, a:
KitIpdCrawler(n, KitIpdCrawlerSection(s), c),
CRAWLERS: dict[str, CrawlerConstructor] = {
"local": lambda n, s, c, a: LocalCrawler(n, LocalCrawlerSection(s), c),
"ilias-web": lambda n, s, c, a: IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a),
"kit-ilias-web": lambda n, s, c, a: KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a),
"kit-ipd": lambda n, s, c, a: KitIpdCrawler(n, KitIpdCrawlerSection(s), c),
}

View File

@@ -1,10 +1,10 @@
import asyncio
import os
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Coroutine
from collections.abc import Awaitable, Callable, Coroutine, Sequence
from datetime import datetime
from pathlib import Path, PurePath
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, TypeVar
from typing import Any, Optional, TypeVar
from ..auth import Authenticator
from ..config import Config, Section
@@ -116,7 +116,7 @@ class CrawlToken(ReusableAsyncContextManager[ProgressBar]):
return bar
class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
class DownloadToken(ReusableAsyncContextManager[tuple[ProgressBar, FileSink]]):
def __init__(self, limiter: Limiter, fs_token: FileSinkToken, path: PurePath):
super().__init__()
@@ -128,12 +128,13 @@ class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
def path(self) -> PurePath:
return self._path
async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]:
async def _on_aenter(self) -> tuple[ProgressBar, FileSink]:
await self._stack.enter_async_context(self._limiter.limit_download())
sink = await self._stack.enter_async_context(self._fs_token)
# The "Downloaded ..." message is printed in the output dir, not here
bar = self._stack.enter_context(log.download_bar("[bold bright_cyan]", "Downloading",
fmt_path(self._path)))
bar = self._stack.enter_context(
log.download_bar("[bold bright_cyan]", "Downloading", fmt_path(self._path))
)
return bar, sink
@@ -204,7 +205,7 @@ class CrawlerSection(Section):
on_windows = os.name == "nt"
return self.s.getboolean("windows_paths", fallback=on_windows)
def auth(self, authenticators: Dict[str, Authenticator]) -> Authenticator:
def auth(self, authenticators: dict[str, Authenticator]) -> Authenticator:
value = self.s.get("auth")
if value is None:
self.missing_value("auth")
@@ -216,10 +217,10 @@ class CrawlerSection(Section):
class Crawler(ABC):
def __init__(
self,
name: str,
section: CrawlerSection,
config: Config,
self,
name: str,
section: CrawlerSection,
config: Config,
) -> None:
"""
Initialize a crawler from its name and its section in the config file.
@@ -261,7 +262,7 @@ class Crawler(ABC):
return self._output_dir
@staticmethod
async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]:
async def gather(awaitables: Sequence[Awaitable[Any]]) -> list[Any]:
"""
Similar to asyncio.gather. However, in the case of an exception, all
still running tasks are cancelled and the exception is rethrown.
@@ -293,13 +294,13 @@ class Crawler(ABC):
return CrawlToken(self._limiter, path)
def should_try_download(
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
) -> bool:
log.explain_topic(f"Decision: Should Download {fmt_path(path)}")
@@ -308,11 +309,7 @@ class Crawler(ABC):
return False
should_download = self._output_dir.should_try_download(
path,
etag_differs=etag_differs,
mtime=mtime,
redownload=redownload,
on_conflict=on_conflict
path, etag_differs=etag_differs, mtime=mtime, redownload=redownload, on_conflict=on_conflict
)
if should_download:
log.explain("Answer: Yes")
@@ -322,13 +319,13 @@ class Crawler(ABC):
return False
async def download(
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
self,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
) -> Optional[DownloadToken]:
log.explain_topic(f"Decision: Download {fmt_path(path)}")
path = self._deduplicator.mark(path)
@@ -346,7 +343,7 @@ class Crawler(ABC):
etag_differs=etag_differs,
mtime=mtime,
redownload=redownload,
on_conflict=on_conflict
on_conflict=on_conflict,
)
if fs_token is None:
log.explain("Answer: No")
@@ -397,7 +394,7 @@ class Crawler(ABC):
log.warn("Couldn't find or load old report")
return
seen: Set[PurePath] = set()
seen: set[PurePath] = set()
for known in sorted(self.prev_report.found_paths):
looking_at = list(reversed(known.parents)) + [known]
for path in looking_at:

View File

@@ -3,7 +3,7 @@ import http.cookies
import ssl
from datetime import datetime
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional, Tuple, cast
from typing import Any, Optional
import aiohttp
import certifi
@@ -29,11 +29,11 @@ class HttpCrawler(Crawler):
COOKIE_FILE = PurePath(".cookies")
def __init__(
self,
name: str,
section: HttpCrawlerSection,
config: Config,
shared_auth: Optional[Authenticator] = None,
self,
name: str,
section: HttpCrawlerSection,
config: Config,
shared_auth: Optional[Authenticator] = None,
) -> None:
super().__init__(name, section, config)
@@ -43,7 +43,7 @@ class HttpCrawler(Crawler):
self._http_timeout = section.http_timeout()
self._cookie_jar_path = self._output_dir.resolve(self.COOKIE_FILE)
self._shared_cookie_jar_paths: Optional[List[Path]] = None
self._shared_cookie_jar_paths: Optional[list[Path]] = None
self._shared_auth = shared_auth
self._output_dir.register_reserved(self.COOKIE_FILE)
@@ -98,7 +98,7 @@ class HttpCrawler(Crawler):
"""
raise RuntimeError("_authenticate() was called but crawler doesn't provide an implementation")
def share_cookies(self, shared: Dict[Authenticator, List[Path]]) -> None:
def share_cookies(self, shared: dict[Authenticator, list[Path]]) -> None:
if not self._shared_auth:
return
@@ -187,7 +187,7 @@ class HttpCrawler(Crawler):
if level == 0 or (level == 1 and drop_h1):
return PurePath()
level_heading = cast(Optional[Tag], tag.find_previous(name=f"h{level}"))
level_heading = tag.find_previous(name=f"h{level}")
if level_heading is None:
return find_associated_headings(tag, level - 1)
@@ -219,7 +219,7 @@ class HttpCrawler(Crawler):
etags[str(path)] = etag
self._output_dir.report.add_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY, etags)
async def _request_resource_version(self, resource_url: str) -> Tuple[Optional[str], Optional[datetime]]:
async def _request_resource_version(self, resource_url: str) -> tuple[Optional[str], Optional[datetime]]:
"""
Requests the ETag and Last-Modified headers of a resource via a HEAD request.
If no entity tag / modification date can be obtained, the according value will be None.
@@ -252,23 +252,23 @@ class HttpCrawler(Crawler):
self._load_cookies()
async with aiohttp.ClientSession(
headers={"User-Agent": f"{NAME}/{VERSION}"},
cookie_jar=self._cookie_jar,
connector=aiohttp.TCPConnector(ssl=ssl.create_default_context(cafile=certifi.where())),
timeout=ClientTimeout(
# 30 minutes. No download in the history of downloads was longer than 30 minutes.
# This is enough to transfer a 600 MB file over a 3 Mib/s connection.
# Allowing an arbitrary value could be annoying for overnight batch jobs
total=15 * 60,
connect=self._http_timeout,
sock_connect=self._http_timeout,
sock_read=self._http_timeout,
),
# See https://github.com/aio-libs/aiohttp/issues/6626
# Without this aiohttp will mangle the redirect header from Shibboleth, invalidating the
# passed signature. Shibboleth will not accept the broken signature and authentication will
# fail.
requote_redirect_url=False
headers={"User-Agent": f"{NAME}/{VERSION}"},
cookie_jar=self._cookie_jar,
connector=aiohttp.TCPConnector(ssl=ssl.create_default_context(cafile=certifi.where())),
timeout=ClientTimeout(
# 30 minutes. No download in the history of downloads was longer than 30 minutes.
# This is enough to transfer a 600 MB file over a 3 Mib/s connection.
# Allowing an arbitrary value could be annoying for overnight batch jobs
total=15 * 60,
connect=self._http_timeout,
sock_connect=self._http_timeout,
sock_read=self._http_timeout,
),
# See https://github.com/aio-libs/aiohttp/issues/6626
# Without this aiohttp will mangle the redirect header from Shibboleth, invalidating the
# passed signature. Shibboleth will not accept the broken signature and authentication will
# fail.
requote_redirect_url=False,
) as session:
self.session = session
try:

View File

@@ -1,5 +1,9 @@
from .kit_ilias_web_crawler import (IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler,
KitIliasWebCrawlerSection)
from .kit_ilias_web_crawler import (
IliasWebCrawler,
IliasWebCrawlerSection,
KitIliasWebCrawler,
KitIliasWebCrawlerSection,
)
__all__ = [
"IliasWebCrawler",

View File

@@ -1,5 +1,6 @@
import asyncio
from typing import Any, Callable, Optional
from collections.abc import Callable
from typing import Any, Optional
import aiohttp
@@ -15,9 +16,9 @@ def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Calla
try:
return await f(*args, **kwargs)
except aiohttp.ContentTypeError: # invalid content type
raise CrawlWarning("ILIAS returned an invalid content type")
raise CrawlWarning("ILIAS returned an invalid content type") from None
except aiohttp.TooManyRedirects:
raise CrawlWarning("Got stuck in a redirect loop")
raise CrawlWarning("Got stuck in a redirect loop") from None
except aiohttp.ClientPayloadError as e: # encoding or not enough bytes
last_exception = e
except aiohttp.ClientConnectionError as e: # e.g. timeout, disconnect, resolve failed, etc.

View File

@@ -254,21 +254,22 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
)
if bot_nav := body.select_one(".ilc_page_bnav_BottomNavigation"):
bot_nav.replace_with(soupify(nav_template.replace(
"{{left}}", left).replace("{{right}}", right).encode())
bot_nav.replace_with(
soupify(nav_template.replace("{{left}}", left).replace("{{right}}", right).encode())
)
body_str = cast(str, body.prettify())
body_str = body.prettify()
return _learning_module_template.replace("{{body}}", body_str).replace("{{name}}", name)
def forum_thread_template(name: str, url: str, heading: bs4.Tag, content: bs4.Tag) -> str:
if title := cast(Optional[bs4.Tag], heading.find(name="b")):
if title := heading.find(name="b"):
title.wrap(bs4.Tag(name="a", attrs={"href": url}))
return _forum_thread_template \
.replace("{{name}}", name) \
.replace("{{heading}}", cast(str, heading.prettify())) \
.replace("{{content}}", cast(str, content.prettify()))
return (
_forum_thread_template.replace("{{name}}", name)
.replace("{{heading}}", heading.prettify())
.replace("{{content}}", content.prettify())
)
@dataclasses.dataclass
@@ -296,9 +297,7 @@ class Links(Enum):
raise ValueError("Missing switch case")
def collection_as_one(self) -> bool:
if self == Links.FANCY:
return True
return False
return self == Links.FANCY
def extension(self) -> Optional[str]:
if self == Links.FANCY:
@@ -330,8 +329,7 @@ class Links(Enum):
# All others get coerced to fancy
content = cast(str, Links.FANCY.template())
repeated_content = cast(
re.Match[str],
re.search(r"<!-- REPEAT START -->([\s\S]+)<!-- REPEAT END -->", content)
re.Match[str], re.search(r"<!-- REPEAT START -->([\s\S]+)<!-- REPEAT END -->", content)
).group(1)
parts = []
@@ -355,4 +353,4 @@ class Links(Enum):
return Links(string)
except ValueError:
options = [f"'{option.value}'" for option in Links]
raise ValueError(f"must be one of {', '.join(options)}")
raise ValueError(f"must be one of {', '.join(options)}") from None

View File

@@ -4,7 +4,7 @@ import os
import re
from collections.abc import Awaitable, Coroutine
from pathlib import PurePath
from typing import Any, Dict, List, Literal, Optional, Set, Union, cast
from typing import Any, Literal, Optional, cast
from urllib.parse import urljoin
import aiohttp
@@ -21,11 +21,19 @@ from ..http_crawler import HttpCrawler, HttpCrawlerSection
from .async_helper import _iorepeat
from .file_templates import LinkData, Links, forum_thread_template, learning_module_template
from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
IliasPageElement, IliasSoup, _sanitize_path_name, parse_ilias_forum_export)
from .kit_ilias_html import (
IliasElementType,
IliasForumThread,
IliasLearningModulePage,
IliasPage,
IliasPageElement,
IliasSoup,
_sanitize_path_name,
parse_ilias_forum_export,
)
from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int]
TargetType = str | int
class LoginTypeLocal:
@@ -41,7 +49,7 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
return base_url
def login(self) -> Union[Literal["shibboleth"], LoginTypeLocal]:
def login(self) -> Literal["shibboleth"] | LoginTypeLocal:
login_type = self.s.get("login_type")
if not login_type:
self.missing_value("login_type")
@@ -55,9 +63,7 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
self.invalid_value("login_type", login_type, "Should be <shibboleth | local>")
def tfa_auth(
self, authenticators: Dict[str, Authenticator]
) -> Optional[Authenticator]:
def tfa_auth(self, authenticators: dict[str, Authenticator]) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
@@ -104,7 +110,7 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
return self.s.getboolean("forums", fallback=False)
_DIRECTORY_PAGES: Set[IliasElementType] = {
_DIRECTORY_PAGES: set[IliasElementType] = {
IliasElementType.EXERCISE,
IliasElementType.EXERCISE_FILES,
IliasElementType.EXERCISE_OVERVIEW,
@@ -116,7 +122,7 @@ _DIRECTORY_PAGES: Set[IliasElementType] = {
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
}
_VIDEO_ELEMENTS: Set[IliasElementType] = {
_VIDEO_ELEMENTS: set[IliasElementType] = {
IliasElementType.MEDIACAST_VIDEO,
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO,
@@ -166,17 +172,19 @@ class IliasWebCrawler(HttpCrawler):
name: str,
section: IliasWebCrawlerSection,
config: Config,
authenticators: Dict[str, Authenticator]
authenticators: dict[str, Authenticator],
):
# Setting a main authenticator for cookie sharing
auth = section.auth(authenticators)
super().__init__(name, section, config, shared_auth=auth)
if section.tasks() > 1:
log.warn("""
log.warn(
"""
Please avoid using too many parallel requests as these are the KIT ILIAS
instance's greatest bottleneck.
""".strip())
""".strip()
)
self._auth = auth
self._base_url = section.base_url()
@@ -193,7 +201,7 @@ instance's greatest bottleneck.
self._links = section.links()
self._videos = section.videos()
self._forums = section.forums()
self._visited_urls: Dict[str, PurePath] = dict()
self._visited_urls: dict[str, PurePath] = dict()
async def _run(self) -> None:
if isinstance(self._target, int):
@@ -210,22 +218,19 @@ instance's greatest bottleneck.
# Start crawling at the given course
root_url = url_set_query_param(
urljoin(self._base_url + "/", "goto.php"),
"target", f"crs_{course_id}",
"target",
f"crs_{course_id}",
)
await self._crawl_url(root_url, expected_id=course_id)
async def _crawl_desktop(self) -> None:
await self._crawl_url(
urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show"),
crawl_nested_courses=True
urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show"), crawl_nested_courses=True
)
async def _crawl_url(
self,
url: str,
expected_id: Optional[int] = None,
crawl_nested_courses: bool = False
self, url: str, expected_id: Optional[int] = None, crawl_nested_courses: bool = False
) -> None:
if awaitable := await self._handle_ilias_page(
url, None, PurePath("."), expected_id, crawl_nested_courses
@@ -238,7 +243,7 @@ instance's greatest bottleneck.
current_element: Optional[IliasPageElement],
path: PurePath,
expected_course_id: Optional[int] = None,
crawl_nested_courses: bool = False
crawl_nested_courses: bool = False,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(path)
if not maybe_cl:
@@ -259,9 +264,9 @@ instance's greatest bottleneck.
expected_course_id: Optional[int] = None,
crawl_nested_courses: bool = False,
) -> None:
elements: List[IliasPageElement] = []
elements: list[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
description: List[BeautifulSoup] = []
description: list[BeautifulSoup] = []
@_iorepeat(3, "crawling folder")
async def gather_elements() -> None:
@@ -304,7 +309,7 @@ instance's greatest bottleneck.
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
tasks: list[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(cl.path, element, crawl_nested_courses):
tasks.append(asyncio.create_task(handle))
@@ -319,10 +324,7 @@ instance's greatest bottleneck.
# works correctly.
@anoncritical
async def _handle_ilias_element(
self,
parent_path: PurePath,
element: IliasPageElement,
crawl_nested_courses: bool = False
self, parent_path: PurePath, element: IliasPageElement, crawl_nested_courses: bool = False
) -> Optional[Coroutine[Any, Any, None]]:
# element.name might contain `/` if the crawler created nested elements,
# so we can not sanitize it here. We trust in the output dir to thwart worst-case
@@ -338,15 +340,14 @@ instance's greatest bottleneck.
)
return None
if element.type in _VIDEO_ELEMENTS:
if not self._videos:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](enable with option 'videos')"
)
return None
if element.type in _VIDEO_ELEMENTS and not self._videos:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](enable with option 'videos')",
)
return None
if element.type == IliasElementType.FILE:
return await self._handle_file(element, element_path)
@@ -356,7 +357,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](enable with option 'forums')"
"[bright_black](enable with option 'forums')",
)
return None
return await self._handle_forum(element, element_path)
@@ -365,7 +366,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](tests contain no relevant data)"
"[bright_black](tests contain no relevant data)",
)
return None
elif element.type == IliasElementType.SURVEY:
@@ -373,7 +374,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](surveys contain no relevant data)"
"[bright_black](surveys contain no relevant data)",
)
return None
elif element.type == IliasElementType.SCORM_LEARNING_MODULE:
@@ -381,7 +382,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](scorm learning modules are not supported)"
"[bright_black](scorm learning modules are not supported)",
)
return None
elif element.type == IliasElementType.LITERATURE_LIST:
@@ -389,7 +390,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](literature lists are not currently supported)"
"[bright_black](literature lists are not currently supported)",
)
return None
elif element.type == IliasElementType.LEARNING_MODULE_HTML:
@@ -397,7 +398,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](HTML learning modules are not supported)"
"[bright_black](HTML learning modules are not supported)",
)
return None
elif element.type == IliasElementType.BLOG:
@@ -405,7 +406,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](blogs are not currently supported)"
"[bright_black](blogs are not currently supported)",
)
return None
elif element.type == IliasElementType.DCL_RECORD_LIST:
@@ -413,7 +414,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](dcl record lists are not currently supported)"
"[bright_black](dcl record lists are not currently supported)",
)
return None
elif element.type == IliasElementType.MEDIA_POOL:
@@ -421,7 +422,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](media pools are not currently supported)"
"[bright_black](media pools are not currently supported)",
)
return None
elif element.type == IliasElementType.COURSE:
@@ -431,7 +432,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](not descending into linked course)"
"[bright_black](not descending into linked course)",
)
return None
elif element.type == IliasElementType.WIKI:
@@ -439,7 +440,7 @@ instance's greatest bottleneck.
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](wikis are not currently supported)"
"[bright_black](wikis are not currently supported)",
)
return None
elif element.type == IliasElementType.LEARNING_MODULE:
@@ -513,19 +514,15 @@ instance's greatest bottleneck.
@anoncritical
@_iorepeat(3, "resolving link")
async def _download_link(
self,
link_renderer: Links,
collection_name: str,
links: list[LinkData],
dl: DownloadToken
self, link_renderer: Links, collection_name: str, links: list[LinkData], dl: DownloadToken
) -> None:
async with dl as (bar, sink):
rendered = link_renderer.interpolate(self._link_file_redirect_delay, collection_name, links)
sink.file.write(rendered.encode("utf-8"))
sink.done()
async def _resolve_link_target(self, export_url: str) -> Union[BeautifulSoup, Literal['none']]:
async def impl() -> Optional[Union[BeautifulSoup, Literal['none']]]:
async def _resolve_link_target(self, export_url: str) -> BeautifulSoup | Literal["none"]:
async def impl() -> Optional[BeautifulSoup | Literal["none"]]:
async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers:
@@ -551,7 +548,7 @@ instance's greatest bottleneck.
@staticmethod
def _parse_link_content(element: IliasPageElement, content: BeautifulSoup) -> list[LinkData]:
links = cast(list[Tag], list(content.select("a")))
links = list(content.select("a"))
if len(links) == 1:
url = str(links[0].get("href")).strip()
return [LinkData(name=element.name, description=element.description or "", url=url)]
@@ -601,7 +598,7 @@ instance's greatest bottleneck.
async with dl as (_bar, sink):
description = clean(insert_base_markup(description))
description_tag = await self.internalize_images(description)
sink.file.write(cast(str, description_tag.prettify()).encode("utf-8"))
sink.file.write(description_tag.prettify().encode("utf-8"))
sink.done()
@anoncritical
@@ -626,7 +623,7 @@ instance's greatest bottleneck.
if self.prev_report:
self.report.add_custom_value(
_get_video_cache_key(element),
self.prev_report.get_custom_value(_get_video_cache_key(element))
self.prev_report.get_custom_value(_get_video_cache_key(element)),
)
# A video might contain other videos, so let's "crawl" the video first
@@ -660,7 +657,7 @@ instance's greatest bottleneck.
def _previous_contained_opencast_videos(
self, element: IliasPageElement, element_path: PurePath
) -> List[PurePath]:
) -> list[PurePath]:
if not self.prev_report:
return []
custom_value = self.prev_report.get_custom_value(_get_video_cache_key(element))
@@ -698,7 +695,7 @@ instance's greatest bottleneck.
def add_to_report(paths: list[str]) -> None:
self.report.add_custom_value(
_get_video_cache_key(element),
{"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))}
{"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))},
)
async with dl as (bar, sink):
@@ -716,7 +713,7 @@ instance's greatest bottleneck.
add_to_report([str(self._transformer.transform(dl.path))])
return
contained_video_paths: List[str] = []
contained_video_paths: list[str] = []
for stream_element in stream_elements:
video_path = dl.path.parent / stream_element.name
@@ -752,11 +749,7 @@ instance's greatest bottleneck.
await self._stream_from_url(element, sink, bar, is_video)
async def _stream_from_url(
self,
element: IliasPageElement,
sink: FileSink,
bar: ProgressBar,
is_video: bool
self, element: IliasPageElement, sink: FileSink, bar: ProgressBar, is_video: bool
) -> None:
url = element.url
@@ -831,14 +824,14 @@ instance's greatest bottleneck.
log.warn("Could not extract forum export url")
return
export = await self._post(export_url, {
"format": "html",
"cmd[createExportFile]": ""
})
export = await self._post(
export_url,
{"format": "html", "cmd[createExportFile]": ""},
)
elements = parse_ilias_forum_export(soupify(export))
tasks: List[Awaitable[None]] = []
tasks: list[Awaitable[None]] = []
for thread in elements:
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, element.url)))
@@ -848,10 +841,7 @@ instance's greatest bottleneck.
@anoncritical
@_iorepeat(3, "saving forum thread")
async def _download_forum_thread(
self,
parent_path: PurePath,
thread: Union[IliasForumThread, IliasPageElement],
forum_url: str
self, parent_path: PurePath, thread: IliasForumThread | IliasPageElement, forum_url: str
) -> None:
path = parent_path / (_sanitize_path_name(thread.name) + ".html")
maybe_dl = await self.download(path, mtime=thread.mtime)
@@ -860,10 +850,7 @@ instance's greatest bottleneck.
async with maybe_dl as (bar, sink):
rendered = forum_thread_template(
thread.name,
forum_url,
thread.name_tag,
await self.internalize_images(thread.content_tag)
thread.name, forum_url, thread.name_tag, await self.internalize_images(thread.content_tag)
)
sink.file.write(rendered.encode("utf-8"))
sink.done()
@@ -883,7 +870,7 @@ instance's greatest bottleneck.
@_iorepeat(3, "crawling learning module")
@anoncritical
async def _crawl_learning_module(self, element: IliasPageElement, cl: CrawlToken) -> None:
elements: List[IliasLearningModulePage] = []
elements: list[IliasLearningModulePage] = []
async with cl:
log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}")
@@ -891,25 +878,25 @@ instance's greatest bottleneck.
soup = await self._get_page(element.url)
page = IliasPage(soup, element)
if next := page.get_learning_module_data():
elements.extend(await self._crawl_learning_module_direction(
cl.path, next.previous_url, "left", element
))
elements.extend(
await self._crawl_learning_module_direction(cl.path, next.previous_url, "left", element)
)
elements.append(next)
elements.extend(await self._crawl_learning_module_direction(
cl.path, next.next_url, "right", element
))
elements.extend(
await self._crawl_learning_module_direction(cl.path, next.next_url, "right", element)
)
# Reflect their natural ordering in the file names
for index, lm_element in enumerate(elements):
lm_element.title = f"{index:02}_{lm_element.title}"
tasks: List[Awaitable[None]] = []
tasks: list[Awaitable[None]] = []
for index, elem in enumerate(elements):
prev_url = elements[index - 1].title if index > 0 else None
next_url = elements[index + 1].title if index < len(elements) - 1 else None
tasks.append(asyncio.create_task(
self._download_learning_module_page(cl.path, elem, prev_url, next_url)
))
tasks.append(
asyncio.create_task(self._download_learning_module_page(cl.path, elem, prev_url, next_url))
)
# And execute them
await self.gather(tasks)
@@ -918,10 +905,10 @@ instance's greatest bottleneck.
self,
path: PurePath,
start_url: Optional[str],
dir: Union[Literal["left"], Literal["right"]],
parent_element: IliasPageElement
) -> List[IliasLearningModulePage]:
elements: List[IliasLearningModulePage] = []
dir: Literal["left"] | Literal["right"],
parent_element: IliasPageElement,
) -> list[IliasLearningModulePage]:
elements: list[IliasLearningModulePage] = []
if not start_url:
return elements
@@ -935,10 +922,7 @@ instance's greatest bottleneck.
page = IliasPage(soup, parent_element)
if next := page.get_learning_module_data():
elements.append(next)
if dir == "left":
next_element_url = next.previous_url
else:
next_element_url = next.next_url
next_element_url = next.previous_url if dir == "left" else next.next_url
counter += 1
return elements
@@ -950,7 +934,7 @@ instance's greatest bottleneck.
parent_path: PurePath,
element: IliasLearningModulePage,
prev: Optional[str],
next: Optional[str]
next: Optional[str],
) -> None:
path = parent_path / (_sanitize_path_name(element.title) + ".html")
maybe_dl = await self.download(path)
@@ -962,16 +946,10 @@ instance's greatest bottleneck.
if prev:
prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html"))
if prev_p:
prev = cast(str, os.path.relpath(prev_p, my_path.parent))
else:
prev = None
prev = os.path.relpath(prev_p, my_path.parent) if prev_p else None
if next:
next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html"))
if next_p:
next = cast(str, os.path.relpath(next_p, my_path.parent))
else:
next = None
next = os.path.relpath(next_p, my_path.parent) if next_p else None
async with maybe_dl as (bar, sink):
content = element.content
@@ -985,16 +963,13 @@ instance's greatest bottleneck.
"""
log.explain_topic("Internalizing images")
for elem in tag.find_all(recursive=True):
if not isinstance(elem, Tag):
continue
if elem.name == "img":
if src := elem.attrs.get("src", None):
url = urljoin(self._base_url, cast(str, src))
if not url.startswith(self._base_url):
continue
log.explain(f"Internalizing {url!r}")
img = await self._get_authenticated(url)
elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode()
if elem.name == "img" and (src := elem.attrs.get("src", None)):
url = urljoin(self._base_url, cast(str, src))
if not url.startswith(self._base_url):
continue
log.explain(f"Internalizing {url!r}")
img = await self._get_authenticated(url)
elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode()
if elem.name == "iframe" and cast(str, elem.attrs.get("src", "")).startswith("//"):
# For unknown reasons the protocol seems to be stripped.
elem.attrs["src"] = "https:" + cast(str, elem.attrs["src"])
@@ -1039,11 +1014,7 @@ instance's greatest bottleneck.
)
return soup
async def _post(
self,
url: str,
data: dict[str, Union[str, List[str]]]
) -> bytes:
async def _post(self, url: str, data: dict[str, str | list[str]]) -> bytes:
form_data = aiohttp.FormData()
for key, val in data.items():
form_data.add_field(key, val)
@@ -1081,7 +1052,7 @@ instance's greatest bottleneck.
async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request:
login_page = soupify(await request.read())
login_form = cast(Optional[Tag], login_page.find("form", attrs={"name": "login_form"}))
login_form = login_page.find("form", attrs={"name": "login_form"})
if login_form is None:
raise CrawlError("Could not find the login form! Specified client id might be invalid.")
@@ -1092,8 +1063,8 @@ instance's greatest bottleneck.
username, password = await self._auth.credentials()
login_form_data = aiohttp.FormData()
login_form_data.add_field('login_form/input_3/input_4', username)
login_form_data.add_field('login_form/input_3/input_5', password)
login_form_data.add_field("login_form/input_3/input_4", username)
login_form_data.add_field("login_form/input_3/input_5", password)
# do the actual login
async with self.session.post(urljoin(self._base_url, login_url), data=login_form_data) as request:

View File

@@ -1,9 +1,10 @@
import json
import re
from collections.abc import Callable
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from enum import Enum
from typing import Callable, Dict, Optional, Union, cast
from typing import Optional, cast
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup, Tag
@@ -13,7 +14,7 @@ from PFERD.crawl.crawler import CrawlWarning
from PFERD.logging import log
from PFERD.utils import url_set_query_params
TargetType = Union[str, int]
TargetType = str | int
class TypeMatcher:
@@ -42,15 +43,15 @@ class TypeMatcher:
self.alt = alt
class All:
matchers: list['IliasElementMatcher']
matchers: list["IliasElementMatcher"]
def __init__(self, matchers: list['IliasElementMatcher']):
def __init__(self, matchers: list["IliasElementMatcher"]):
self.matchers = matchers
class Any:
matchers: list['IliasElementMatcher']
matchers: list["IliasElementMatcher"]
def __init__(self, matchers: list['IliasElementMatcher']):
def __init__(self, matchers: list["IliasElementMatcher"]):
self.matchers = matchers
@staticmethod
@@ -70,11 +71,11 @@ class TypeMatcher:
return TypeMatcher.ImgAlt(alt)
@staticmethod
def all(*matchers: 'IliasElementMatcher') -> All:
def all(*matchers: "IliasElementMatcher") -> All:
return TypeMatcher.All(list(matchers))
@staticmethod
def any(*matchers: 'IliasElementMatcher') -> Any:
def any(*matchers: "IliasElementMatcher") -> Any:
return TypeMatcher.Any(list(matchers))
@staticmethod
@@ -127,20 +128,14 @@ class IliasElementType(Enum):
def matcher(self) -> IliasElementMatcher:
match self:
case IliasElementType.BLOG:
return TypeMatcher.any(
TypeMatcher.img_src("_blog.svg")
)
return TypeMatcher.any(TypeMatcher.img_src("_blog.svg"))
case IliasElementType.BOOKING:
return TypeMatcher.any(
TypeMatcher.path("/book/"),
TypeMatcher.img_src("_book.svg")
)
return TypeMatcher.any(TypeMatcher.path("/book/"), TypeMatcher.img_src("_book.svg"))
case IliasElementType.COURSE:
return TypeMatcher.any(TypeMatcher.path("/crs/"), TypeMatcher.img_src("_crsr.svg"))
case IliasElementType.DCL_RECORD_LIST:
return TypeMatcher.any(
TypeMatcher.img_src("_dcl.svg"),
TypeMatcher.query("cmdclass=ildclrecordlistgui")
TypeMatcher.img_src("_dcl.svg"), TypeMatcher.query("cmdclass=ildclrecordlistgui")
)
case IliasElementType.EXERCISE:
return TypeMatcher.never()
@@ -162,14 +157,11 @@ class IliasElementType(Enum):
return TypeMatcher.any(
TypeMatcher.path("/fold/"),
TypeMatcher.img_src("_fold.svg"),
TypeMatcher.path("/grp/"),
TypeMatcher.img_src("_grp.svg"),
TypeMatcher.path("/copa/"),
TypeMatcher.path("_copa_"),
TypeMatcher.img_src("_copa.svg"),
# Not supported right now but warn users
# TypeMatcher.query("baseclass=ilmediapoolpresentationgui"),
# TypeMatcher.img_alt("medienpool"),
@@ -188,14 +180,10 @@ class IliasElementType(Enum):
case IliasElementType.LITERATURE_LIST:
return TypeMatcher.img_src("_bibl.svg")
case IliasElementType.LEARNING_MODULE:
return TypeMatcher.any(
TypeMatcher.path("/lm/"),
TypeMatcher.img_src("_lm.svg")
)
return TypeMatcher.any(TypeMatcher.path("/lm/"), TypeMatcher.img_src("_lm.svg"))
case IliasElementType.LEARNING_MODULE_HTML:
return TypeMatcher.any(
TypeMatcher.query("baseclass=ilhtlmpresentationgui"),
TypeMatcher.img_src("_htlm.svg")
TypeMatcher.query("baseclass=ilhtlmpresentationgui"), TypeMatcher.img_src("_htlm.svg")
)
case IliasElementType.LINK:
return TypeMatcher.any(
@@ -203,17 +191,16 @@ class IliasElementType(Enum):
TypeMatcher.query("baseclass=illinkresourcehandlergui"),
TypeMatcher.query("calldirectlink"),
),
TypeMatcher.img_src("_webr.svg") # duplicated :(
TypeMatcher.img_src("_webr.svg"), # duplicated :(
)
case IliasElementType.LINK_COLLECTION:
return TypeMatcher.any(
TypeMatcher.query("baseclass=illinkresourcehandlergui"),
TypeMatcher.img_src("_webr.svg") # duplicated :(
TypeMatcher.img_src("_webr.svg"), # duplicated :(
)
case IliasElementType.MEDIA_POOL:
return TypeMatcher.any(
TypeMatcher.query("baseclass=ilmediapoolpresentationgui"),
TypeMatcher.img_src("_mep.svg")
TypeMatcher.query("baseclass=ilmediapoolpresentationgui"), TypeMatcher.img_src("_mep.svg")
)
case IliasElementType.MEDIACAST_VIDEO:
return TypeMatcher.never()
@@ -221,12 +208,10 @@ class IliasElementType(Enum):
return TypeMatcher.any(
TypeMatcher.path("/mcst/"),
TypeMatcher.query("baseclass=ilmediacasthandlergui"),
TypeMatcher.img_src("_mcst.svg")
TypeMatcher.img_src("_mcst.svg"),
)
case IliasElementType.MEETING:
return TypeMatcher.any(
TypeMatcher.img_src("_sess.svg")
)
return TypeMatcher.any(TypeMatcher.img_src("_sess.svg"))
case IliasElementType.MOB_VIDEO:
return TypeMatcher.never()
case IliasElementType.OPENCAST_VIDEO:
@@ -239,24 +224,19 @@ class IliasElementType(Enum):
return TypeMatcher.never()
case IliasElementType.SCORM_LEARNING_MODULE:
return TypeMatcher.any(
TypeMatcher.query("baseclass=ilsahspresentationgui"),
TypeMatcher.img_src("_sahs.svg")
TypeMatcher.query("baseclass=ilsahspresentationgui"), TypeMatcher.img_src("_sahs.svg")
)
case IliasElementType.SURVEY:
return TypeMatcher.any(
TypeMatcher.path("/svy/"),
TypeMatcher.img_src("svy.svg")
)
return TypeMatcher.any(TypeMatcher.path("/svy/"), TypeMatcher.img_src("svy.svg"))
case IliasElementType.TEST:
return TypeMatcher.any(
TypeMatcher.query("cmdclass=ilobjtestgui"),
TypeMatcher.query("cmdclass=iltestscreengui"),
TypeMatcher.img_src("_tst.svg")
TypeMatcher.img_src("_tst.svg"),
)
case IliasElementType.WIKI:
return TypeMatcher.any(
TypeMatcher.query("baseClass=ilwikihandlergui"),
TypeMatcher.img_src("wiki.svg")
TypeMatcher.query("baseClass=ilwikihandlergui"), TypeMatcher.img_src("wiki.svg")
)
raise CrawlWarning(f"Unknown matcher {self}")
@@ -291,7 +271,7 @@ class IliasPageElement:
r"thr_pk=(?P<id>\d+)", # forums
r"ref_id=(?P<id>\d+)",
r"target=[a-z]+_(?P<id>\d+)",
r"mm_(?P<id>\d+)"
r"mm_(?P<id>\d+)",
]
for regex in regexes:
@@ -309,8 +289,8 @@ class IliasPageElement:
name: str,
mtime: Optional[datetime] = None,
description: Optional[str] = None,
skip_sanitize: bool = False
) -> 'IliasPageElement':
skip_sanitize: bool = False,
) -> "IliasPageElement":
if typ == IliasElementType.MEETING:
normalized = IliasPageElement._normalize_meeting_name(name)
log.explain(f"Normalized meeting name from {name!r} to {normalized!r}")
@@ -329,7 +309,7 @@ class IliasPageElement:
"""
# This checks whether we can reach a `:` without passing a `-`
if re.search(r"^[^-]+: ", meeting_name):
if re.search(r"^[^-]+: ", meeting_name): # noqa: SIM108
# Meeting name only contains date: "05. Jan 2000:"
split_delimiter = ":"
else:
@@ -352,7 +332,7 @@ class IliasPageElement:
@dataclass
class IliasDownloadForumData:
url: str
form_data: Dict[str, Union[str, list[str]]]
form_data: dict[str, str | list[str]]
empty: bool
@@ -382,7 +362,6 @@ class IliasSoup:
class IliasPage:
def __init__(self, ilias_soup: IliasSoup, source_element: Optional[IliasPageElement]):
self._ilias_soup = ilias_soup
self._soup = ilias_soup.soup
@@ -422,23 +401,23 @@ class IliasPage:
return self._find_normal_entries()
def get_info_tab(self) -> Optional[IliasPageElement]:
tab: Optional[Tag] = cast(Optional[Tag], self._soup.find(
name="a",
attrs={"href": lambda x: x is not None and "cmdClass=ilinfoscreengui" in x}
))
tab: Optional[Tag] = self._soup.find(
name="a", attrs={"href": lambda x: x is not None and "cmdClass=ilinfoscreengui" in x}
)
if tab is not None:
return IliasPageElement.create_new(
IliasElementType.INFO_TAB,
self._abs_url_from_link(tab),
"infos"
IliasElementType.INFO_TAB, self._abs_url_from_link(tab), "infos"
)
return None
def get_description(self) -> Optional[BeautifulSoup]:
def is_interesting_class(name: str) -> bool:
def is_interesting_class(name: str | None) -> bool:
return name in [
"ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap",
"ilc_va_ihcap_AccordIHeadCap", "ilc_media_cont_MediaContainer"
"ilCOPageSection",
"ilc_Paragraph",
"ilc_va_ihcap_VAccordIHeadCap",
"ilc_va_ihcap_AccordIHeadCap",
"ilc_media_cont_MediaContainer",
]
paragraphs: list[Tag] = cast(list[Tag], self._soup.find_all(class_=is_interesting_class))
@@ -452,21 +431,20 @@ class IliasPage:
for p in paragraphs:
if p.find_parent(class_=is_interesting_class):
continue
if "ilc_media_cont_MediaContainer" in p["class"]:
if "ilc_media_cont_MediaContainer" in p["class"] and (video := p.select_one("video")):
# We have an embedded video which should be downloaded by _find_mob_videos
if video := p.select_one("video"):
url, title = self._find_mob_video_url_title(video, p)
raw_html += '<div style="min-width: 100px; min-height: 100px; border: 1px solid black;'
raw_html += 'display: flex; justify-content: center; align-items: center;'
raw_html += ' margin: 0.5rem;">'
if url is not None and urlparse(url).hostname != urlparse(self._page_url).hostname:
if url.startswith("//"):
url = "https:" + url
raw_html += f'<a href="{url}" target="_blank">External Video: {title}</a>'
else:
raw_html += f"Video elided. Filename: '{title}'."
raw_html += "</div>\n"
continue
url, title = self._find_mob_video_url_title(video, p)
raw_html += '<div style="min-width: 100px; min-height: 100px; border: 1px solid black;'
raw_html += "display: flex; justify-content: center; align-items: center;"
raw_html += ' margin: 0.5rem;">'
if url is not None and urlparse(url).hostname != urlparse(self._page_url).hostname:
if url.startswith("//"):
url = "https:" + url
raw_html += f'<a href="{url}" target="_blank">External Video: {title}</a>'
else:
raw_html += f"Video elided. Filename: '{title}'."
raw_html += "</div>\n"
continue
# Ignore special listings (like folder groupings)
if "ilc_section_Special" in p["class"]:
@@ -486,7 +464,7 @@ class IliasPage:
title=title,
content=content,
next_url=self._find_learning_module_next(),
previous_url=self._find_learning_module_prev()
previous_url=self._find_learning_module_prev(),
)
def _find_learning_module_next(self) -> Optional[str]:
@@ -515,10 +493,7 @@ class IliasPage:
base_url = re.sub(r"cmd=\w+", "cmd=post", base_url)
base_url = re.sub(r"cmdClass=\w+", "cmdClass=ilExportGUI", base_url)
rtoken_form = cast(
Optional[Tag],
self._soup.find("form", attrs={"action": lambda x: x is not None and "rtoken=" in x})
)
rtoken_form = self._soup.find("form", attrs={"action": lambda x: x is not None and "rtoken=" in x})
if not rtoken_form:
log.explain("Found no rtoken anywhere")
return None
@@ -557,9 +532,7 @@ class IliasPage:
return True
# Raw listing without ILIAS fluff
video_element_table = self._soup.find(
name="table", id=re.compile(r"tbl_xoct_.+")
)
video_element_table = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+"))
return video_element_table is not None
def _is_ilias_opencast_embedding(self) -> bool:
@@ -600,24 +573,23 @@ class IliasPage:
return self._uncollapse_future_meetings_url() is not None
def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]:
element = cast(Optional[Tag], self._soup.find(
element = self._soup.find(
"a",
attrs={"href": lambda x: x is not None and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)}
))
attrs={"href": lambda x: x is not None and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)},
)
if not element:
return None
link = self._abs_url_from_link(element)
return IliasPageElement.create_new(IliasElementType.FOLDER, link, "show all meetings")
def _is_exercise_not_all_shown(self) -> bool:
return (self._page_type == IliasElementType.EXERCISE_OVERVIEW
and "mode=all" not in self._page_url.lower())
return (
self._page_type == IliasElementType.EXERCISE_OVERVIEW and "mode=all" not in self._page_url.lower()
)
def _show_all_exercises(self) -> Optional[IliasPageElement]:
return IliasPageElement.create_new(
IliasElementType.EXERCISE_OVERVIEW,
self._page_url + "&mode=all",
"show all exercises"
IliasElementType.EXERCISE_OVERVIEW, self._page_url + "&mode=all", "show all exercises"
)
def _is_content_tab_selected(self) -> bool:
@@ -631,14 +603,13 @@ class IliasPage:
return "baseClass=ilmembershipoverviewgui" in self._page_url
def _select_content_page_url(self) -> Optional[IliasPageElement]:
tab = cast(Optional[Tag], self._soup.find(
id="tab_view_content",
attrs={"class": lambda x: x is not None and "active" not in x}
))
tab = self._soup.find(
id="tab_view_content", attrs={"class": lambda x: x is not None and "active" not in x}
)
# Already selected (or not found)
if not tab:
return None
link = cast(Optional[Tag], tab.find("a"))
link = tab.find("a")
if link:
link_str = self._abs_url_from_link(link)
return IliasPageElement.create_new(IliasElementType.FOLDER, link_str, "select content page")
@@ -654,9 +625,7 @@ class IliasPage:
# on the page, but defined in a JS object inside a script tag, passed to the player
# library.
# We do the impossible and RegEx the stream JSON object out of the page's HTML source
regex = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
)
regex = re.compile(r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE)
json_match = regex.search(str(self._soup))
if json_match is None:
@@ -687,10 +656,9 @@ class IliasPage:
def _get_show_max_forum_entries_per_page_url(
self, wanted_max: Optional[int] = None
) -> Optional[IliasPageElement]:
correct_link = cast(Optional[Tag], self._soup.find(
"a",
attrs={"href": lambda x: x is not None and "trows=800" in x and "cmd=showThreads" in x}
))
correct_link = self._soup.find(
"a", attrs={"href": lambda x: x is not None and "trows=800" in x and "cmd=showThreads" in x}
)
if not correct_link:
return None
@@ -721,7 +689,7 @@ class IliasPage:
titles: list[Tag] = self._soup.select("#block_pditems_0 .il-item-title")
for title in titles:
link = cast(Optional[Tag], title.find("a"))
link = title.find("a")
if not link:
log.explain(f"Skipping offline item: {title.get_text().strip()!r}")
@@ -775,11 +743,11 @@ class IliasPage:
continue
if "cmd=sendfile" not in link["href"]:
continue
items.append(IliasPageElement.create_new(
IliasElementType.FILE,
self._abs_url_from_link(link),
_sanitize_path_name(link.get_text())
))
items.append(
IliasPageElement.create_new(
IliasElementType.FILE, self._abs_url_from_link(link), _sanitize_path_name(link.get_text())
)
)
return items
@@ -791,9 +759,7 @@ class IliasPage:
#
# We need to figure out where we are.
video_element_table = cast(Optional[Tag], self._soup.find(
name="table", id=re.compile(r"tbl_xoct_.+")
))
video_element_table = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+"))
if video_element_table is None:
# We are in stage 1
@@ -809,14 +775,14 @@ class IliasPage:
is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None
if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER:
if is_paginated and self._page_type != IliasElementType.OPENCAST_VIDEO_FOLDER:
# We are in stage 2 - try to break pagination
return self._find_opencast_video_entries_paginated()
return self._find_opencast_video_entries_no_paging()
def _find_opencast_video_entries_paginated(self) -> list[IliasPageElement]:
table_element = cast(Optional[Tag], self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")))
table_element = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+"))
if table_element is None:
log.warn("Couldn't increase elements per page (table not found). I might miss elements.")
@@ -829,8 +795,7 @@ class IliasPage:
table_id = id_match.group(1)
query_params = {f"tbl_xoct_{table_id}_trows": "800",
"cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
query_params = {f"tbl_xoct_{table_id}_trows": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
url = url_set_query_params(self._page_url, query_params)
log.explain("Disabled pagination, retrying folder as a new entry")
@@ -841,9 +806,9 @@ class IliasPage:
Crawls the "second stage" video page. This page contains the actual video urls.
"""
# Video start links are marked with an "Abspielen" link
video_links = cast(list[Tag], self._soup.find_all(
name="a", text=re.compile(r"\s*(Abspielen|Play)\s*")
))
video_links = cast(
list[Tag], self._soup.find_all(name="a", text=re.compile(r"\s*(Abspielen|Play)\s*"))
)
results: list[IliasPageElement] = []
@@ -860,9 +825,7 @@ class IliasPage:
row: Tag = link.parent.parent.parent # type: ignore
column_count = len(row.select("td.std"))
for index in range(column_count, 0, -1):
modification_string = link.parent.parent.parent.select_one( # type: ignore
f"td.std:nth-child({index})"
).get_text().strip()
modification_string = cast(Tag, row.select_one(f"td.std:nth-child({index})")).get_text().strip()
if match := re.search(r"\d+\.\d+.\d+ \d+:\d+", modification_string):
modification_time = datetime.strptime(match.group(0), "%d.%m.%Y %H:%M")
break
@@ -871,7 +834,7 @@ class IliasPage:
log.warn(f"Could not determine upload time for {link}")
modification_time = datetime.now()
title = link.parent.parent.parent.select_one("td.std:nth-child(3)").get_text().strip() # type: ignore
title = cast(Tag, row.select_one("td.std:nth-child(3)")).get_text().strip()
title += ".mp4"
video_name: str = _sanitize_path_name(title)
@@ -899,27 +862,31 @@ class IliasPage:
def _find_exercise_entries_detail_page(self) -> list[IliasPageElement]:
results: list[IliasPageElement] = []
if link := cast(Optional[Tag], self._soup.select_one("#tab_submission > a")):
results.append(IliasPageElement.create_new(
IliasElementType.EXERCISE_FILES,
self._abs_url_from_link(link),
"Submission"
))
if link := self._soup.select_one("#tab_submission > a"):
results.append(
IliasPageElement.create_new(
IliasElementType.EXERCISE_FILES, self._abs_url_from_link(link), "Submission"
)
)
else:
log.explain("Found no submission link for exercise, maybe it has not started yet?")
# Find all download links in the container (this will contain all the *feedback* files)
download_links = cast(list[Tag], self._soup.find_all(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x is not None and "cmd=download" in x},
text="Download"
))
download_links = cast(
list[Tag],
self._soup.find_all(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x is not None and "cmd=download" in x},
text="Download",
),
)
for link in download_links:
parent_row: Tag = cast(Tag, link.find_parent(
attrs={"class": lambda x: x is not None and "row" in x}))
name_tag = cast(Optional[Tag], parent_row.find(name="div"))
parent_row: Tag = cast(
Tag, link.find_parent(attrs={"class": lambda x: x is not None and "row" in x})
)
name_tag = parent_row.find(name="div")
if not name_tag:
log.warn("Could not find name tag for exercise entry")
@@ -929,11 +896,9 @@ class IliasPage:
name = _sanitize_path_name(name_tag.get_text().strip())
log.explain(f"Found exercise detail entry {name!r}")
results.append(IliasPageElement.create_new(
IliasElementType.FILE,
self._abs_url_from_link(link),
name
))
results.append(
IliasPageElement.create_new(IliasElementType.FILE, self._abs_url_from_link(link), name)
)
return results
@@ -941,12 +906,15 @@ class IliasPage:
results: list[IliasPageElement] = []
# Find all download links in the container
download_links = cast(list[Tag], self._soup.find_all(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x is not None and "cmd=download" in x},
text="Download"
))
download_links = cast(
list[Tag],
self._soup.find_all(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x is not None and "cmd=download" in x},
text="Download",
),
)
for link in download_links:
parent_row: Tag = cast(Tag, link.find_parent("tr"))
@@ -963,19 +931,16 @@ class IliasPage:
if date is None:
log.warn(f"Date parsing failed for exercise file entry {name!r}")
results.append(IliasPageElement.create_new(
IliasElementType.FILE,
self._abs_url_from_link(link),
name,
date
))
results.append(
IliasPageElement.create_new(IliasElementType.FILE, self._abs_url_from_link(link), name, date)
)
return results
def _find_exercise_entries_root_page(self) -> list[IliasPageElement]:
results: list[IliasPageElement] = []
content_tab = cast(Optional[Tag], self._soup.find(id="ilContentContainer"))
content_tab = self._soup.find(id="ilContentContainer")
if not content_tab:
log.warn("Could not find content tab in exercise overview page")
_unexpected_html_warning()
@@ -993,11 +958,11 @@ class IliasPage:
continue
name = _sanitize_path_name(exercise.get_text().strip())
results.append(IliasPageElement.create_new(
IliasElementType.EXERCISE,
self._abs_url_from_link(exercise),
name
))
results.append(
IliasPageElement.create_new(
IliasElementType.EXERCISE, self._abs_url_from_link(exercise), name
)
)
for result in results:
log.explain(f"Found exercise {result.name!r}")
@@ -1043,13 +1008,11 @@ class IliasPage:
continue
log.explain(f"Found {element_name!r} of type {element_type}")
result.append(IliasPageElement.create_new(
element_type,
abs_url,
element_name,
description=description,
skip_sanitize=True
))
result.append(
IliasPageElement.create_new(
element_type, abs_url, element_name, description=description, skip_sanitize=True
)
)
result += self._find_cards()
result += self._find_mediacast_videos()
@@ -1086,11 +1049,13 @@ class IliasPage:
if not title.endswith(".mp4") and not title.endswith(".webm"):
# just to make sure it has some kinda-alrightish ending
title = title + ".mp4"
videos.append(IliasPageElement.create_new(
typ=IliasElementType.MEDIACAST_VIDEO,
url=self._abs_url_from_relative(cast(str, url)),
name=_sanitize_path_name(title)
))
videos.append(
IliasPageElement.create_new(
typ=IliasElementType.MEDIACAST_VIDEO,
url=self._abs_url_from_relative(cast(str, url)),
name=_sanitize_path_name(title),
)
)
return videos
@@ -1114,12 +1079,11 @@ class IliasPage:
log.explain(f"Found external video at {url}, ignoring")
continue
videos.append(IliasPageElement.create_new(
typ=IliasElementType.MOB_VIDEO,
url=url,
name=_sanitize_path_name(title),
mtime=None
))
videos.append(
IliasPageElement.create_new(
typ=IliasElementType.MOB_VIDEO, url=url, name=_sanitize_path_name(title), mtime=None
)
)
return videos
@@ -1133,7 +1097,7 @@ class IliasPage:
if url is None and video_element.get("src"):
url = cast(Optional[str], video_element.get("src"))
fig_caption = cast(Optional[Tag], figure.select_one("figcaption"))
fig_caption = figure.select_one("figcaption")
if fig_caption:
title = cast(Tag, figure.select_one("figcaption")).get_text().strip() + ".mp4"
elif url is not None:
@@ -1161,11 +1125,11 @@ class IliasPage:
# We should not crawl files under meetings
if "ilContainerListItemContentCB" in cast(str, parent.get("class")):
link: Tag = parent.parent.find("a") # type: ignore
link: Tag = cast(Tag, cast(Tag, parent.parent).find("a"))
typ = IliasPage._find_type_for_element(
"meeting",
self._abs_url_from_link(link),
lambda: IliasPage._find_icon_for_folder_entry(link)
lambda: IliasPage._find_icon_for_folder_entry(link),
)
return typ == IliasElementType.MEETING
@@ -1179,6 +1143,9 @@ class IliasPage:
"""
found_titles = []
if None == "hey":
pass
outer_accordion_content: Optional[Tag] = None
parents: list[Tag] = list(tag.parents)
@@ -1191,9 +1158,11 @@ class IliasPage:
# This is for these weird JS-y blocks and custom item groups
if "ilContainerItemsContainer" in cast(str, parent.get("class")):
data_store_url = parent.parent.get("data-store-url", "").lower() # type: ignore
is_custom_item_group = "baseclass=ilcontainerblockpropertiesstoragegui" in data_store_url \
and "cont_block_id=" in data_store_url
data_store_url = cast(str, cast(Tag, parent.parent).get("data-store-url", "")).lower()
is_custom_item_group = (
"baseclass=ilcontainerblockpropertiesstoragegui" in data_store_url
and "cont_block_id=" in data_store_url
)
# I am currently under the impression that *only* those JS blocks have an
# ilNoDisplay class.
if not is_custom_item_group and "ilNoDisplay" not in cast(str, parent.get("class")):
@@ -1212,11 +1181,15 @@ class IliasPage:
if outer_accordion_content:
accordion_tag = cast(Tag, outer_accordion_content.parent)
head_tag = cast(Tag, accordion_tag.find(attrs={
"class": lambda x: x is not None and (
"ilc_va_ihead_VAccordIHead" in x or "ilc_va_ihead_AccordIHead" in x
)
}))
head_tag = cast(
Tag,
accordion_tag.find(
attrs={
"class": lambda x: x is not None
and ("ilc_va_ihead_VAccordIHead" in x or "ilc_va_ihead_AccordIHead" in x)
}
),
)
found_titles.append(head_tag.get_text().strip())
return [_sanitize_path_name(x) for x in reversed(found_titles)]
@@ -1224,14 +1197,12 @@ class IliasPage:
@staticmethod
def _find_link_description(link: Tag) -> Optional[str]:
tile = cast(
Tag,
link.find_parent("div", {"class": lambda x: x is not None and "il_ContainerListItem" in x})
Tag, link.find_parent("div", {"class": lambda x: x is not None and "il_ContainerListItem" in x})
)
if not tile:
return None
description_element = cast(
Tag,
tile.find("div", {"class": lambda x: x is not None and "il_Description" in x})
Tag, tile.find("div", {"class": lambda x: x is not None and "il_Description" in x})
)
if not description_element:
return None
@@ -1242,9 +1213,15 @@ class IliasPage:
# Files have a list of properties (type, modification date, size, etc.)
# In a series of divs.
# Find the parent containing all those divs, so we can filter our what we need
properties_parent = cast(Tag, cast(Tag, link_element.find_parent(
"div", {"class": lambda x: "il_ContainerListItem" in x}
)).select_one(".il_ItemProperties"))
properties_parent = cast(
Tag,
cast(
Tag,
link_element.find_parent(
"div", {"class": lambda x: x is not None and "il_ContainerListItem" in x}
),
).select_one(".il_ItemProperties"),
)
# The first one is always the filetype
file_type = cast(Tag, properties_parent.select_one("span.il_ItemProperty")).get_text().strip()
@@ -1271,9 +1248,7 @@ class IliasPage:
for title in card_titles:
url = self._abs_url_from_link(title)
name = _sanitize_path_name(title.get_text().strip())
typ = IliasPage._find_type_for_element(
name, url, lambda: IliasPage._find_icon_from_card(title)
)
typ = IliasPage._find_type_for_element(name, url, lambda: IliasPage._find_icon_from_card(title))
if not typ:
_unexpected_html_warning()
@@ -1300,18 +1275,16 @@ class IliasPage:
continue
url = self._abs_url_from_relative(open_match.group(1))
name = _sanitize_path_name(button.get_text().strip())
typ = IliasPage._find_type_for_element(
name, url, lambda: IliasPage._find_icon_from_card(button)
typ = IliasPage._find_type_for_element(name, url, lambda: IliasPage._find_icon_from_card(button))
caption_parent = cast(
Tag,
button.find_parent(
"div",
attrs={"class": lambda x: x is not None and "caption" in x},
),
)
caption_parent = cast(Tag, button.find_parent(
"div",
attrs={"class": lambda x: x is not None and "caption" in x},
))
caption_container = caption_parent.find_next_sibling("div")
if caption_container:
description = caption_container.get_text().strip()
else:
description = None
description = caption_container.get_text().strip() if caption_container else None
if not typ:
_unexpected_html_warning()
@@ -1377,9 +1350,7 @@ class IliasPage:
if found_parent is None:
_unexpected_html_warning()
log.warn_contd(
f"Tried to figure out element type, but did not find an icon for {link_element!r}"
)
log.warn_contd(f"Tried to figure out element type, but did not find an icon for {link_element!r}")
return None
# Find the small descriptive icon to figure out the type
@@ -1389,8 +1360,7 @@ class IliasPage:
img_tag = found_parent.select_one("img.icon")
is_session_expansion_button = found_parent.find(
"a",
attrs={"href": lambda x: x is not None and ("crs_next_sess=" in x or "crs_prev_sess=" in x)}
"a", attrs={"href": lambda x: x is not None and ("crs_next_sess=" in x or "crs_prev_sess=" in x)}
)
if img_tag is None and is_session_expansion_button:
log.explain("Found session expansion button, skipping it as it has no content")
@@ -1426,7 +1396,7 @@ class IliasPage:
def is_logged_in(ilias_soup: IliasSoup) -> bool:
soup = ilias_soup.soup
# Normal ILIAS pages
mainbar = cast(Optional[Tag], soup.find(class_="il-maincontrols-metabar"))
mainbar = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x is not None and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
@@ -1447,23 +1417,18 @@ class IliasPage:
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
recursive=True, name="table", attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False
return soup.select_one("#playerContainer") is not None
@staticmethod
def _find_date_in_text(text: str) -> Optional[datetime]:
modification_date_match = re.search(
r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)",
text
r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)", text
)
if modification_date_match is not None:
modification_date_str = modification_date_match.group(1)
@@ -1501,8 +1466,8 @@ def _unexpected_html_warning() -> None:
log.warn("Encountered unexpected HTML structure, ignoring element.")
german_months = ['Jan', 'Feb', 'Mär', 'Apr', 'Mai', 'Jun', 'Jul', 'Aug', 'Sep', 'Okt', 'Nov', 'Dez']
english_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
german_months = ["Jan", "Feb", "Mär", "Apr", "Mai", "Jun", "Jul", "Aug", "Sep", "Okt", "Nov", "Dez"]
english_months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
def demangle_date(date_str: str, fail_silently: bool = False) -> Optional[datetime]:
@@ -1517,11 +1482,11 @@ def demangle_date(date_str: str, fail_silently: bool = False) -> Optional[dateti
# Normalize whitespace because users
date_str = re.sub(r"\s+", " ", date_str)
date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I)
date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I)
date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I)
date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, flags=re.I)
date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, flags=re.I)
date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, flags=re.I)
date_str = date_str.strip()
for german, english in zip(german_months, english_months):
for german, english in zip(german_months, english_months, strict=True):
date_str = date_str.replace(german, english)
# Remove trailing dots for abbreviations, e.g. "20. Apr. 2020" -> "20. Apr 2020"
date_str = date_str.replace(english + ".", english)
@@ -1575,11 +1540,11 @@ def parse_ilias_forum_export(forum_export: BeautifulSoup) -> list[IliasForumThre
elements = []
for p in forum_export.select("body > p"):
title_tag = p
content_tag = cast(Optional[Tag], p.find_next_sibling("ul"))
content_tag = p.find_next_sibling("ul")
title = cast(Tag, p.find("b")).text
if ":" in title:
title = title[title.find(":") + 1:]
title = title[title.find(":") + 1 :]
title = title.strip()
if not content_tag or content_tag.find_previous_sibling("p") != title_tag:
@@ -1604,7 +1569,7 @@ def _guess_timestamp_from_forum_post_content(content: Tag) -> Optional[datetime]
for post in posts:
text = post.text.strip()
text = text[text.rfind("|") + 1:]
text = text[text.rfind("|") + 1 :]
date = demangle_date(text, fail_silently=True)
if not date:
continue

View File

@@ -1,4 +1,4 @@
from typing import Dict, Literal
from typing import Literal
from ...auth import Authenticator
from ...config import Config
@@ -26,7 +26,7 @@ class KitIliasWebCrawler(IliasWebCrawler):
name: str,
section: KitIliasWebCrawlerSection,
config: Config,
authenticators: Dict[str, Authenticator],
authenticators: dict[str, Authenticator],
):
super().__init__(name, section, config, authenticators)

View File

@@ -38,9 +38,7 @@ class ShibbolethLogin:
async with sess.get(url) as response:
shib_url = response.url
if str(shib_url).startswith(self._ilias_url):
log.explain(
"ILIAS recognized our shib token and logged us in in the background, returning"
)
log.explain("ILIAS recognized our shib token and logged us in in the background, returning")
return
soup: BeautifulSoup = soupify(await response.read())
@@ -81,7 +79,7 @@ class ShibbolethLogin:
# (or clicking "Continue" if you have JS disabled)
relay_state = cast(Tag, soup.find("input", {"name": "RelayState"}))
saml_response = cast(Tag, soup.find("input", {"name": "SAMLResponse"}))
url = form = soup.find("form", {"method": "post"})["action"] # type: ignore
url = cast(str, cast(Tag, soup.find("form", {"method": "post"}))["action"])
data = { # using the info obtained in the while loop above
"RelayState": cast(str, relay_state["value"]),
"SAMLResponse": cast(str, saml_response["value"]),

View File

@@ -1,9 +1,11 @@
import os
import re
from collections.abc import Awaitable, Generator, Iterable
from dataclasses import dataclass
from datetime import datetime
from pathlib import PurePath
from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union, cast
from re import Pattern
from typing import Any, Optional, Union, cast
from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag
@@ -44,7 +46,7 @@ class KitIpdFile:
@dataclass
class KitIpdFolder:
name: str
entries: List[Union[KitIpdFile, "KitIpdFolder"]]
entries: list[Union[KitIpdFile, "KitIpdFolder"]]
def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}")
@@ -53,12 +55,11 @@ class KitIpdFolder:
class KitIpdCrawler(HttpCrawler):
def __init__(
self,
name: str,
section: KitIpdCrawlerSection,
config: Config,
self,
name: str,
section: KitIpdCrawlerSection,
config: Config,
):
super().__init__(name, section, config)
self._url = section.target()
@@ -69,7 +70,7 @@ class KitIpdCrawler(HttpCrawler):
if not maybe_cl:
return
tasks: List[Awaitable[None]] = []
tasks: list[Awaitable[None]] = []
async with maybe_cl:
for item in await self._fetch_items():
@@ -104,11 +105,7 @@ class KitIpdCrawler(HttpCrawler):
await self.gather(tasks)
async def _download_file(
self,
parent: PurePath,
file: KitIpdFile,
etag: Optional[str],
mtime: Optional[datetime]
self, parent: PurePath, file: KitIpdFile, etag: Optional[str], mtime: Optional[datetime]
) -> None:
element_path = parent / file.name
@@ -125,9 +122,9 @@ class KitIpdCrawler(HttpCrawler):
async with maybe_dl as (bar, sink):
await self._stream_from_url(file.url, element_path, sink, bar)
async def _fetch_items(self) -> Iterable[Union[KitIpdFile, KitIpdFolder]]:
async def _fetch_items(self) -> Iterable[KitIpdFile | KitIpdFolder]:
page, url = await self.get_page()
elements: List[Tag] = self._find_file_links(page)
elements: list[Tag] = self._find_file_links(page)
# do not add unnecessary nesting for a single <h1> heading
drop_h1: bool = len(page.find_all(name="h1")) <= 1
@@ -156,7 +153,7 @@ class KitIpdCrawler(HttpCrawler):
name = os.path.basename(url)
return KitIpdFile(name, url)
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> list[Tag]:
def _find_file_links(self, tag: Tag | BeautifulSoup) -> list[Tag]:
return cast(list[Tag], tag.find_all(name="a", attrs={"href": self._file_regex}))
def _abs_url_from_link(self, url: str, link_tag: Tag) -> str:
@@ -177,7 +174,7 @@ class KitIpdCrawler(HttpCrawler):
self._add_etag_to_report(path, resp.headers.get("ETag"))
async def get_page(self) -> Tuple[BeautifulSoup, str]:
async def get_page(self) -> tuple[BeautifulSoup, str]:
async with self.session.get(self._url) as request:
# The web page for Algorithmen für Routenplanung contains some
# weird comments that beautifulsoup doesn't parse correctly. This

View File

@@ -18,31 +18,28 @@ class LocalCrawlerSection(CrawlerSection):
def crawl_delay(self) -> float:
value = self.s.getfloat("crawl_delay", fallback=0.0)
if value < 0:
self.invalid_value("crawl_delay", value,
"Must not be negative")
self.invalid_value("crawl_delay", value, "Must not be negative")
return value
def download_delay(self) -> float:
value = self.s.getfloat("download_delay", fallback=0.0)
if value < 0:
self.invalid_value("download_delay", value,
"Must not be negative")
self.invalid_value("download_delay", value, "Must not be negative")
return value
def download_speed(self) -> Optional[int]:
value = self.s.getint("download_speed")
if value is not None and value <= 0:
self.invalid_value("download_speed", value,
"Must be greater than 0")
self.invalid_value("download_speed", value, "Must be greater than 0")
return value
class LocalCrawler(Crawler):
def __init__(
self,
name: str,
section: LocalCrawlerSection,
config: Config,
self,
name: str,
section: LocalCrawlerSection,
config: Config,
):
super().__init__(name, section, config)
@@ -74,10 +71,12 @@ class LocalCrawler(Crawler):
tasks = []
async with cl:
await asyncio.sleep(random.uniform(
0.5 * self._crawl_delay,
self._crawl_delay,
))
await asyncio.sleep(
random.uniform(
0.5 * self._crawl_delay,
self._crawl_delay,
)
)
for child in path.iterdir():
pure_child = cl.path / child.name
@@ -93,10 +92,12 @@ class LocalCrawler(Crawler):
return
async with dl as (bar, sink):
await asyncio.sleep(random.uniform(
0.5 * self._download_delay,
self._download_delay,
))
await asyncio.sleep(
random.uniform(
0.5 * self._download_delay,
self._download_delay,
)
)
bar.set_total(stat.st_size)

View File

@@ -1,5 +1,5 @@
from collections.abc import Iterator
from pathlib import PurePath
from typing import Iterator, Set
from .logging import log
from .utils import fmt_path
@@ -16,15 +16,34 @@ def name_variants(path: PurePath) -> Iterator[PurePath]:
class Deduplicator:
FORBIDDEN_CHARS = '<>:"/\\|?*' + "".join([chr(i) for i in range(0, 32)])
FORBIDDEN_NAMES = {
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9",
"CON",
"PRN",
"AUX",
"NUL",
"COM1",
"COM2",
"COM3",
"COM4",
"COM5",
"COM6",
"COM7",
"COM8",
"COM9",
"LPT1",
"LPT2",
"LPT3",
"LPT4",
"LPT5",
"LPT6",
"LPT7",
"LPT8",
"LPT9",
}
def __init__(self, windows_paths: bool) -> None:
self._windows_paths = windows_paths
self._known: Set[PurePath] = set()
self._known: set[PurePath] = set()
def _add(self, path: PurePath) -> None:
self._known.add(path)

View File

@@ -1,8 +1,9 @@
import asyncio
import time
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncIterator, Optional
from typing import Optional
@dataclass
@@ -12,12 +13,7 @@ class Slot:
class Limiter:
def __init__(
self,
task_limit: int,
download_limit: int,
task_delay: float
):
def __init__(self, task_limit: int, download_limit: int, task_delay: float):
if task_limit <= 0:
raise ValueError("task limit must be at least 1")
if download_limit <= 0:

View File

@@ -1,15 +1,23 @@
import asyncio
import sys
import traceback
from collections.abc import AsyncIterator, Iterator
from contextlib import AbstractContextManager, asynccontextmanager, contextmanager
from typing import AsyncIterator, Iterator, List, Optional
from typing import Any, Optional
from rich.console import Console, Group
from rich.live import Live
from rich.markup import escape
from rich.panel import Panel
from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID, TextColumn, TimeRemainingColumn,
TransferSpeedColumn)
from rich.progress import (
BarColumn,
DownloadColumn,
Progress,
TaskID,
TextColumn,
TimeRemainingColumn,
TransferSpeedColumn,
)
from rich.table import Column
@@ -53,7 +61,7 @@ class Log:
self._showing_progress = False
self._progress_suspended = False
self._lock = asyncio.Lock()
self._lines: List[str] = []
self._lines: list[str] = []
# Whether different parts of the output are enabled or disabled
self.output_explain = False
@@ -114,7 +122,7 @@ class Log:
for line in self._lines:
self.print(line)
def print(self, text: str) -> None:
def print(self, text: Any) -> None:
"""
Print a normal message. Allows markup.
"""
@@ -176,10 +184,14 @@ class Log:
# Our print function doesn't take types other than strings, but the
# underlying rich.print function does. This call is a special case
# anyways, and we're calling it internally, so this should be fine.
self.print(Panel.fit("""
self.print(
Panel.fit(
"""
Please copy your program output and send it to the PFERD maintainers, either
directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
""".strip())) # type: ignore
""".strip()
)
)
def explain_topic(self, text: str) -> None:
"""
@@ -236,10 +248,10 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
@contextmanager
def _bar(
self,
progress: Progress,
description: str,
total: Optional[float],
self,
progress: Progress,
description: str,
total: Optional[float],
) -> Iterator[ProgressBar]:
if total is None:
# Indeterminate progress bar
@@ -255,11 +267,11 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
self._update_live()
def crawl_bar(
self,
style: str,
action: str,
text: str,
total: Optional[float] = None,
self,
style: str,
action: str,
text: str,
total: Optional[float] = None,
) -> AbstractContextManager[ProgressBar]:
"""
Allows markup in the "style" argument which will be applied to the
@@ -271,11 +283,11 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
return self._bar(self._crawl_progress, description, total)
def download_bar(
self,
style: str,
action: str,
text: str,
total: Optional[float] = None,
self,
style: str,
action: str,
text: str,
total: Optional[float] = None,
) -> AbstractContextManager[ProgressBar]:
"""
Allows markup in the "style" argument which will be applied to the

View File

@@ -4,12 +4,13 @@ import os
import random
import shutil
import string
from contextlib import contextmanager
from collections.abc import Iterator
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from pathlib import Path, PurePath
from typing import BinaryIO, Iterator, Optional, Tuple
from typing import BinaryIO, Optional
from .logging import log
from .report import Report, ReportLoadError
@@ -35,8 +36,7 @@ class Redownload(Enum):
try:
return Redownload(string)
except ValueError:
raise ValueError("must be one of 'never', 'never-smart',"
" 'always', 'always-smart'")
raise ValueError("must be one of 'never', 'never-smart', 'always', 'always-smart'") from None
class OnConflict(Enum):
@@ -51,8 +51,10 @@ class OnConflict(Enum):
try:
return OnConflict(string)
except ValueError:
raise ValueError("must be one of 'prompt', 'local-first',"
" 'remote-first', 'no-delete', 'no-delete-prompt-overwrite'")
raise ValueError(
"must be one of 'prompt', 'local-first',"
" 'remote-first', 'no-delete', 'no-delete-prompt-overwrite'"
) from None
@dataclass
@@ -96,13 +98,13 @@ class FileSinkToken(ReusableAsyncContextManager[FileSink]):
# download handed back to the OutputDirectory.
def __init__(
self,
output_dir: "OutputDirectory",
remote_path: PurePath,
path: PurePath,
local_path: Path,
heuristics: Heuristics,
on_conflict: OnConflict,
self,
output_dir: "OutputDirectory",
remote_path: PurePath,
path: PurePath,
local_path: Path,
heuristics: Heuristics,
on_conflict: OnConflict,
):
super().__init__()
@@ -118,15 +120,17 @@ class FileSinkToken(ReusableAsyncContextManager[FileSink]):
sink = FileSink(file)
async def after_download() -> None:
await self._output_dir._after_download(DownloadInfo(
self._remote_path,
self._path,
self._local_path,
tmp_path,
self._heuristics,
self._on_conflict,
sink.is_done(),
))
await self._output_dir._after_download(
DownloadInfo(
self._remote_path,
self._path,
self._local_path,
tmp_path,
self._heuristics,
self._on_conflict,
sink.is_done(),
)
)
self._stack.push_async_callback(after_download)
self._stack.enter_context(file)
@@ -138,10 +142,10 @@ class OutputDirectory:
REPORT_FILE = PurePath(".report")
def __init__(
self,
root: Path,
redownload: Redownload,
on_conflict: OnConflict,
self,
root: Path,
redownload: Redownload,
on_conflict: OnConflict,
):
if os.name == "nt":
# Windows limits the path length to 260 for some historical reason.
@@ -174,8 +178,8 @@ class OutputDirectory:
try:
self._root.mkdir(parents=True, exist_ok=True)
except OSError:
raise OutputDirError("Failed to create base directory")
except OSError as e:
raise OutputDirError("Failed to create base directory") from e
def register_reserved(self, path: PurePath) -> None:
self._report.mark_reserved(path)
@@ -193,11 +197,11 @@ class OutputDirectory:
return self._root / path
def _should_download(
self,
local_path: Path,
heuristics: Heuristics,
redownload: Redownload,
on_conflict: OnConflict,
self,
local_path: Path,
heuristics: Heuristics,
redownload: Redownload,
on_conflict: OnConflict,
) -> bool:
if not local_path.exists():
log.explain("No corresponding file present locally")
@@ -270,9 +274,9 @@ class OutputDirectory:
# files.
async def _conflict_lfrf(
self,
on_conflict: OnConflict,
path: PurePath,
self,
on_conflict: OnConflict,
path: PurePath,
) -> bool:
if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
async with log.exclusive_output():
@@ -289,9 +293,9 @@ class OutputDirectory:
raise ValueError(f"{on_conflict!r} is not a valid conflict policy")
async def _conflict_ldrf(
self,
on_conflict: OnConflict,
path: PurePath,
self,
on_conflict: OnConflict,
path: PurePath,
) -> bool:
if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
async with log.exclusive_output():
@@ -308,10 +312,10 @@ class OutputDirectory:
raise ValueError(f"{on_conflict!r} is not a valid conflict policy")
async def _conflict_lfrd(
self,
on_conflict: OnConflict,
path: PurePath,
parent: PurePath,
self,
on_conflict: OnConflict,
path: PurePath,
parent: PurePath,
) -> bool:
if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
async with log.exclusive_output():
@@ -328,9 +332,9 @@ class OutputDirectory:
raise ValueError(f"{on_conflict!r} is not a valid conflict policy")
async def _conflict_delete_lf(
self,
on_conflict: OnConflict,
path: PurePath,
self,
on_conflict: OnConflict,
path: PurePath,
) -> bool:
if on_conflict == OnConflict.PROMPT:
async with log.exclusive_output():
@@ -353,9 +357,9 @@ class OutputDirectory:
return base.parent / name
async def _create_tmp_file(
self,
local_path: Path,
) -> Tuple[Path, BinaryIO]:
self,
local_path: Path,
) -> tuple[Path, BinaryIO]:
"""
May raise an OutputDirError.
"""
@@ -388,14 +392,14 @@ class OutputDirectory:
return self._should_download(local_path, heuristics, redownload, on_conflict)
async def download(
self,
remote_path: PurePath,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
self,
remote_path: PurePath,
path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
) -> Optional[FileSinkToken]:
"""
May throw an OutputDirError, a MarkDuplicateError or a
@@ -506,10 +510,8 @@ class OutputDirectory:
await self._cleanup(child, pure_child)
if delete_self:
try:
with suppress(OSError):
path.rmdir()
except OSError:
pass
async def _cleanup_file(self, path: Path, pure: PurePath) -> None:
if self._report.is_marked(pure):

View File

@@ -1,5 +1,5 @@
from pathlib import Path, PurePath
from typing import Dict, List, Optional
from typing import Optional
from rich.markup import escape
@@ -15,7 +15,7 @@ class PferdLoadError(Exception):
class Pferd:
def __init__(self, config: Config, cli_crawlers: Optional[List[str]], cli_skips: Optional[List[str]]):
def __init__(self, config: Config, cli_crawlers: Optional[list[str]], cli_skips: Optional[list[str]]):
"""
May throw PferdLoadError.
"""
@@ -23,10 +23,10 @@ class Pferd:
self._config = config
self._crawlers_to_run = self._find_crawlers_to_run(config, cli_crawlers, cli_skips)
self._authenticators: Dict[str, Authenticator] = {}
self._crawlers: Dict[str, Crawler] = {}
self._authenticators: dict[str, Authenticator] = {}
self._crawlers: dict[str, Crawler] = {}
def _find_config_crawlers(self, config: Config) -> List[str]:
def _find_config_crawlers(self, config: Config) -> list[str]:
crawl_sections = []
for name, section in config.crawl_sections():
@@ -37,7 +37,7 @@ class Pferd:
return crawl_sections
def _find_cli_crawlers(self, config: Config, cli_crawlers: List[str]) -> List[str]:
def _find_cli_crawlers(self, config: Config, cli_crawlers: list[str]) -> list[str]:
if len(cli_crawlers) != len(set(cli_crawlers)):
raise PferdLoadError("Some crawlers were selected multiple times")
@@ -66,14 +66,14 @@ class Pferd:
return crawlers_to_run
def _find_crawlers_to_run(
self,
config: Config,
cli_crawlers: Optional[List[str]],
cli_skips: Optional[List[str]],
) -> List[str]:
self,
config: Config,
cli_crawlers: Optional[list[str]],
cli_skips: Optional[list[str]],
) -> list[str]:
log.explain_topic("Deciding which crawlers to run")
crawlers: List[str]
crawlers: list[str]
if cli_crawlers is None:
log.explain("No crawlers specified on CLI")
log.explain("Running crawlers specified in config")
@@ -104,7 +104,7 @@ class Pferd:
def _load_crawlers(self) -> None:
# Cookie sharing
kit_ilias_web_paths: Dict[Authenticator, List[Path]] = {}
kit_ilias_web_paths: dict[Authenticator, list[Path]] = {}
for name, section in self._config.crawl_sections():
log.print(f"[bold bright_cyan]Loading[/] {escape(name)}")
@@ -117,9 +117,8 @@ class Pferd:
crawler = crawler_constructor(name, section, self._config, self._authenticators)
self._crawlers[name] = crawler
if self._config.default_section.share_cookies():
if isinstance(crawler, KitIliasWebCrawler):
crawler.share_cookies(kit_ilias_web_paths)
if self._config.default_section.share_cookies() and isinstance(crawler, KitIliasWebCrawler):
crawler.share_cookies(kit_ilias_web_paths)
def debug_transforms(self) -> None:
for name in self._crawlers_to_run:

View File

@@ -1,6 +1,6 @@
import json
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional, Set
from typing import Any, Optional
class ReportLoadError(Exception):
@@ -42,32 +42,32 @@ class Report:
def __init__(self) -> None:
# Paths found by the crawler, untransformed
self.found_paths: Set[PurePath] = set()
self.found_paths: set[PurePath] = set()
# Files reserved for metadata files (e. g. the report file or cookies)
# that can't be overwritten by user transforms and won't be cleaned up
# at the end.
self.reserved_files: Set[PurePath] = set()
self.reserved_files: set[PurePath] = set()
# Files found by the crawler, transformed. Only includes files that
# were downloaded (or a download was attempted)
self.known_files: Set[PurePath] = set()
self.known_files: set[PurePath] = set()
self.added_files: Set[PurePath] = set()
self.changed_files: Set[PurePath] = set()
self.deleted_files: Set[PurePath] = set()
self.added_files: set[PurePath] = set()
self.changed_files: set[PurePath] = set()
self.deleted_files: set[PurePath] = set()
# Files that should have been deleted by the cleanup but weren't
self.not_deleted_files: Set[PurePath] = set()
self.not_deleted_files: set[PurePath] = set()
# Custom crawler-specific data
self.custom: Dict[str, Any] = dict()
self.custom: dict[str, Any] = dict()
# Encountered errors and warnings
self.encountered_warnings: List[str] = []
self.encountered_errors: List[str] = []
self.encountered_warnings: list[str] = []
self.encountered_errors: list[str] = []
@staticmethod
def _get_list_of_strs(data: Dict[str, Any], key: str) -> List[str]:
def _get_list_of_strs(data: dict[str, Any], key: str) -> list[str]:
result: Any = data.get(key, [])
if not isinstance(result, list):
@@ -80,8 +80,8 @@ class Report:
return result
@staticmethod
def _get_str_dictionary(data: Dict[str, Any], key: str) -> Dict[str, Any]:
result: Dict[str, Any] = data.get(key, {})
def _get_str_dictionary(data: dict[str, Any], key: str) -> dict[str, Any]:
result: dict[str, Any] = data.get(key, {})
if not isinstance(result, dict):
raise ReportLoadError(f"Incorrect format: {key!r} is not a dictionary")
@@ -170,7 +170,7 @@ class Report:
self.known_files.add(path)
@property
def marked(self) -> Set[PurePath]:
def marked(self) -> set[PurePath]:
return self.known_files | self.reserved_files
def is_marked(self, path: PurePath) -> bool:

View File

@@ -1,10 +1,12 @@
import ast
import contextlib
import re
from abc import ABC, abstractmethod
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from enum import Enum
from pathlib import PurePath
from typing import Callable, Dict, List, Optional, Sequence, TypeVar, Union
from typing import Optional, TypeVar
from .logging import log
from .utils import fmt_path, str_path
@@ -23,7 +25,7 @@ class Empty:
pass
RightSide = Union[str, Ignore, Empty]
RightSide = str | Ignore | Empty
@dataclass
@@ -35,7 +37,7 @@ class Ignored:
pass
TransformResult = Optional[Union[Transformed, Ignored]]
TransformResult = Transformed | Ignored | None
@dataclass
@@ -47,7 +49,7 @@ class Rule:
right: RightSide
right_index: int
def right_result(self, path: PurePath) -> Union[str, Transformed, Ignored]:
def right_result(self, path: PurePath) -> str | Transformed | Ignored:
if isinstance(self.right, str):
return self.right
elif isinstance(self.right, Ignore):
@@ -93,24 +95,20 @@ class ExactReTf(Transformation):
# since elements of "match.groups()" can be None, mypy is wrong.
groups: Sequence[Optional[str]] = [match[0]] + list(match.groups())
locals_dir: Dict[str, Union[str, int, float]] = {}
locals_dir: dict[str, str | int | float] = {}
for i, group in enumerate(groups):
if group is None:
continue
locals_dir[f"g{i}"] = group
try:
with contextlib.suppress(ValueError):
locals_dir[f"i{i}"] = int(group)
except ValueError:
pass
try:
with contextlib.suppress(ValueError):
locals_dir[f"f{i}"] = float(group)
except ValueError:
pass
named_groups: Dict[str, str] = match.groupdict()
named_groups: dict[str, str] = match.groupdict()
for name, capture in named_groups.items():
locals_dir[name] = capture
@@ -208,7 +206,7 @@ class Line:
@property
def rest(self) -> str:
return self.line[self.index:]
return self.line[self.index :]
def peek(self, amount: int = 1) -> str:
return self.rest[:amount]
@@ -228,7 +226,7 @@ class Line:
self.expect(string)
return value
def one_of(self, parsers: List[Callable[[], T]], description: str) -> T:
def one_of(self, parsers: list[Callable[[], T]], description: str) -> T:
for parser in parsers:
index = self.index
try:
@@ -315,7 +313,7 @@ def parse_left(line: Line) -> str:
return parse_str(line)
def parse_right(line: Line) -> Union[str, Ignore]:
def parse_right(line: Line) -> str | Ignore:
c = line.peek()
if c in QUOTATION_MARKS:
return parse_quoted_str(line)
@@ -327,21 +325,27 @@ def parse_right(line: Line) -> Union[str, Ignore]:
def parse_arrow_name(line: Line) -> str:
return line.one_of([
lambda: line.expect("exact-re"),
lambda: line.expect("exact"),
lambda: line.expect("name-re"),
lambda: line.expect("name"),
lambda: line.expect("re"),
lambda: line.expect(""),
], "Expected arrow name")
return line.one_of(
[
lambda: line.expect("exact-re"),
lambda: line.expect("exact"),
lambda: line.expect("name-re"),
lambda: line.expect("name"),
lambda: line.expect("re"),
lambda: line.expect(""),
],
"Expected arrow name",
)
def parse_arrow_head(line: Line) -> ArrowHead:
return line.one_of([
lambda: line.expect_with(">>", ArrowHead.SEQUENCE),
lambda: line.expect_with(">", ArrowHead.NORMAL),
], "Expected arrow head")
return line.one_of(
[
lambda: line.expect_with(">>", ArrowHead.SEQUENCE),
lambda: line.expect_with(">", ArrowHead.NORMAL),
],
"Expected arrow head",
)
def parse_eol(line: Line) -> None:
@@ -413,12 +417,12 @@ class Transformer:
def transform(self, path: PurePath) -> Optional[PurePath]:
for i, (line, tf) in enumerate(self._tfs):
log.explain(f"Testing rule {i+1}: {line}")
log.explain(f"Testing rule {i + 1}: {line}")
try:
result = tf.transform(path)
except Exception as e:
log.warn(f"Error while testing rule {i+1}: {line}")
log.warn(f"Error while testing rule {i + 1}: {line}")
log.warn_contd(str(e))
continue

View File

@@ -3,10 +3,11 @@ import getpass
import sys
import threading
from abc import ABC, abstractmethod
from collections.abc import Callable
from contextlib import AsyncExitStack
from pathlib import Path, PurePath
from types import TracebackType
from typing import Any, Callable, Dict, Generic, Optional, Type, TypeVar
from typing import Any, Generic, Optional, TypeVar
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit
import bs4
@@ -79,7 +80,7 @@ def url_set_query_param(url: str, param: str, value: str) -> str:
return urlunsplit((scheme, netloc, path, new_query_string, fragment))
def url_set_query_params(url: str, params: Dict[str, str]) -> str:
def url_set_query_params(url: str, params: dict[str, str]) -> str:
"""
Sets multiple query parameters in an url, overwriting existing ones.
"""
@@ -131,10 +132,10 @@ class ReusableAsyncContextManager(ABC, Generic[T]):
return result
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
self,
exc_type: Optional[type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
if not self._active:
raise RuntimeError("__aexit__ called too many times")

View File

@@ -20,16 +20,29 @@ pferd = "PFERD.__main__:main"
[tool.setuptools.dynamic]
version = {attr = "PFERD.version.VERSION"}
[tool.flake8]
max-line-length = 110
[tool.ruff]
line-length = 110
[tool.isort]
line_length = 110
[tool.autopep8]
max_line_length = 110
in-place = true
recursive = true
[tool.ruff.lint]
select = [
# pycodestyle
"E",
# Pyflakes
"F",
# pyupgrade
"UP",
# flake8-bugbear
"B",
# flake8-simplify
"SIM",
# isort
"I",
]
ignore = [
"UP045",
"SIM114",
"B023"
]
[tool.mypy]
disallow_any_generics = true
@@ -40,3 +53,10 @@ warn_unused_ignores = true
warn_unreachable = true
show_error_context = true
ignore_missing_imports = true
[dependency-groups]
dev = [
"mypy>=1.18.2",
"pyinstaller>=6.16.0",
"ruff>=0.14.1",
]

View File

@@ -2,4 +2,4 @@
set -e
pyinstaller --onefile pferd.py
uv run pyinstaller --onefile pferd.py

View File

@@ -2,5 +2,5 @@
set -e
mypy .
flake8 PFERD
uv run mypy .
uv run ruff check

View File

@@ -2,5 +2,4 @@
set -e
autopep8 .
isort .
uv run ruff format

1090
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff