Compare commits

..

No commits in common. "master" and "v3.0.0" have entirely different histories.

39 changed files with 615 additions and 2995 deletions

View File

@ -14,12 +14,12 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python: ["3.9"]
python: ["3.8"]
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v2
- uses: actions/setup-python@v4
- uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python }}
@ -45,7 +45,7 @@ jobs:
run: mv dist/pferd* dist/pferd-${{ matrix.os }}
- name: Upload binary
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v2
with:
name: Binaries
path: dist/pferd-${{ matrix.os }}
@ -57,7 +57,7 @@ jobs:
steps:
- name: Download binaries
uses: actions/download-artifact@v3
uses: actions/download-artifact@v2
with:
name: Binaries

1
.gitignore vendored
View File

@ -3,7 +3,6 @@
/PFERD.egg-info/
__pycache__/
/.vscode/
/.idea/
# pyinstaller
/pferd.spec

View File

@ -22,164 +22,6 @@ ambiguous situations.
## Unreleased
### Fixed
- Video name deduplication
## 3.5.0 - 2023-09-13
### Added
- `no-delete-prompt-override` conflict resolution strategy
- Support for ILIAS learning modules
- `show_not_deleted` option to stop printing the "Not Deleted" status or report
message. This combines nicely with the `no-delete-prompt-override` strategy,
causing PFERD to mostly ignore local-only files.
- Support for mediacast video listings
- Crawling of files in info tab
### Changed
- Remove size suffix for files in content pages
### Fixed
- Crawling of courses with the timeline view as the default tab
- Crawling of file and custom opencast cards
- Crawling of button cards without descriptions
- Abort crawling when encountering an unexpected ilias root page redirect
- Sanitize ascii control characters on Windows
- Crawling of paginated past meetings
- Ignore SCORM learning modules
## 3.4.3 - 2022-11-29
### Added
- Missing documentation for `forums` option
### Changed
- Clear up error message shown when multiple paths are found to an element
### Fixed
- IPD crawler unnecessarily appending trailing slashes
- Crawling opencast when ILIAS is set to English
## 3.4.2 - 2022-10-26
### Added
- Recognize and crawl content pages in cards
- Recognize and ignore surveys
### Fixed
- Forum crawling crashing when a thread has no messages at all
- Forum crawling crashing when a forum has no threads at all
- Ilias login failing in some cases
- Crawling of paginated future meetings
- IPD crawler handling of URLs without trailing slash
## 3.4.1 - 2022-08-17
### Added
- Download of page descriptions
- Forum download support
- `pass` authenticator
### Changed
- Add `cpp` extension to default `link_regex` of IPD crawler
- Mention hrefs in IPD crawler's `--explain` output for users of `link_regex` option
- Simplify default IPD crawler `link_regex`
### Fixed
- IPD crawler crashes on some sites
- Meeting name normalization for yesterday, today and tomorrow
- Crawling of meeting file previews
- Login with new login button html layout
- Descriptions for courses are now placed in the correct subfolder when
downloading the whole desktop
## 3.4.0 - 2022-05-01
### Added
- Message when Shibboleth entitlements need to be manually reviewed
- Links to unofficial packages and repology in the readme
### Changed
- Increase minimum supported Python version to 3.9
- Support video listings with more columns
- Use UTF-8 when reading/writing the config file
### Fixed
- Crash during authentication when the Shibboleth session is still valid
## 3.3.1 - 2022-01-15
### Fixed
- ILIAS login
- Local video cache if `windows_paths` is enabled
## 3.3.0 - 2022-01-09
### Added
- A KIT IPD crawler
- Support for ILIAS cards
- (Rudimentary) support for content pages
- Support for multi-stream videos
- Support for ILIAS 7
### Removed
- [Interpolation](https://docs.python.org/3/library/configparser.html#interpolation-of-values) in config file
### Fixed
- Crawling of recursive courses
- Crawling files directly placed on the personal desktop
- Ignore timestamps at the unix epoch as they crash on windows
## 3.2.0 - 2021-08-04
### Added
- `--skip` command line option
- Support for ILIAS booking objects
### Changed
- Using multiple path segments on left side of `-name->` now results in an
error. This was already forbidden by the documentation but silently accepted
by PFERD.
- More consistent path printing in some `--explain` messages
### Fixed
- Nondeterministic name deduplication due to ILIAS reordering elements
- More exceptions are handled properly
## 3.1.0 - 2021-06-13
If your config file doesn't do weird things with transforms, it should continue
to work. If your `-re->` arrows behave weirdly, try replacing them with
`-exact-re->` arrows. If you're on Windows, you might need to switch from `\`
path separators to `/` in your regex rules.
### Added
- `skip` option for crawlers
- Rules with `>>` instead of `>` as arrow head
- `-exact-re->` arrow (behaves like `-re->` did previously)
### Changed
- The `-re->` arrow can now rename directories (like `-->`)
- Use `/` instead of `\` as path separator for (regex) rules on Windows
- Use the label to the left for exercises instead of the button name to
determine the folder name
### Fixed
- Video pagination handling in ILIAS crawler
## 3.0.1 - 2021-06-01
### Added
- `credential-file` authenticator
- `--credential-file` option for `kit-ilias-web` command
- Warning if using concurrent tasks with `kit-ilias-web`
### Changed
- Cookies are now stored in a text-based format
### Fixed
- Date parsing now also works correctly in non-group exercises
## 3.0.0 - 2021-05-31
### Added

218
CONFIG.md
View File

@ -4,11 +4,11 @@ A config file consists of sections. A section begins with a `[section]` header,
which is followed by a list of `key = value` pairs. Comments must be on their
own line and start with `#`. Multiline values must be indented beyond their key.
Boolean values can be `yes` or `no`. For more details and some examples on the
format, see the [configparser documentation][1] ([interpolation][2] is
disabled).
format, see the [configparser documentation][1] ([basic interpolation][2] is
enabled).
[1]: <https://docs.python.org/3/library/configparser.html#supported-ini-file-structure> "Supported INI File Structure"
[2]: <https://docs.python.org/3/library/configparser.html#interpolation-of-values> "Interpolation of values"
[2]: <https://docs.python.org/3/library/configparser.html#configparser.BasicInterpolation> "BasicInterpolation"
## The `DEFAULT` section
@ -26,9 +26,6 @@ default values for the other sections.
`Added ...`) while running a crawler. (Default: `yes`)
- `report`: Whether PFERD should print a report of added, changed and deleted
local files for all crawlers before exiting. (Default: `yes`)
- `show_not_deleted`: Whether PFERD should print messages in status and report
when a local-only file wasn't deleted. Combines nicely with the
`no-delete-prompt-override` conflict resolution strategy.
- `share_cookies`: Whether crawlers should share cookies where applicable. For
example, some crawlers share cookies if they crawl the same website using the
same account. (Default: `yes`)
@ -39,7 +36,7 @@ Sections whose names start with `crawl:` are used to configure crawlers. The
rest of the section name specifies the name of the crawler.
A crawler synchronizes a remote resource to a local directory. There are
different types of crawlers for different kinds of resources, e.g. ILIAS
different types of crawlers for different kinds of resources, e. g. ILIAS
courses or lecture websites.
Each crawl section represents an instance of a specific type of crawler. The
@ -52,11 +49,8 @@ see the type's [documentation](#crawler-types) below. The following options are
common to all crawlers:
- `type`: The available types are specified in [this section](#crawler-types).
- `skip`: Whether the crawler should be skipped during normal execution. The
crawler can still be executed manually using the `--crawler` or `-C` flags.
(Default: `no`)
- `output_dir`: The directory the crawler synchronizes files to. A crawler will
never place any files outside this directory. (Default: the crawler's name)
never place any files outside of this directory. (Default: the crawler's name)
- `redownload`: When to download a file that is already present locally.
(Default: `never-smart`)
- `never`: If a file is present locally, it is not downloaded again.
@ -78,9 +72,6 @@ common to all crawlers:
using `prompt` and always choosing "yes".
- `no-delete`: Never delete local files, but overwrite local files if the
remote file is different.
- `no-delete-prompt-overwrite`: Never delete local files, but prompt to
overwrite local files if the remote file is different. Combines nicely
with the `show_not_deleted` option.
- `transform`: Rules for renaming and excluding certain files and directories.
For more details, see [this section](#transformation-rules). (Default: empty)
- `tasks`: The maximum number of concurrent tasks (such as crawling or
@ -142,18 +133,6 @@ crawler simulate a slower, network-based crawler.
requests. (Default: `0.0`)
- `download_speed`: Download speed (in bytes per second) to simulate. (Optional)
### The `kit-ipd` crawler
This crawler crawls a KIT-IPD page by url. The root page can be crawled from
outside the KIT network so you will be informed about any new/deleted files,
but downloading files requires you to be within. Adding a show delay between
requests is likely a good idea.
- `target`: URL to a KIT-IPD page
- `link_regex`: A regex that is matched against the `href` part of links. If it
matches, the given link is downloaded as a file. This is used to extract
files from KIT-IPD pages. (Default: `^.*?[^/]+\.(pdf|zip|c|cpp|java)$`)
### The `kit-ilias-web` crawler
This crawler crawls the KIT ILIAS instance.
@ -187,7 +166,6 @@ script once per day should be fine.
redirect to the actual URL. Set to a negative value to disable the automatic
redirect. (Default: `-1`)
- `videos`: Whether to download videos. (Default: `no`)
- `forums`: Whether to download forum threads. (Default: `no`)
- `http_timeout`: The timeout (in seconds) for all HTTP requests. (Default:
`20.0`)
@ -202,22 +180,6 @@ via the terminal.
- `username`: The username. (Optional)
- `password`: The password. (Optional)
### The `credential-file` authenticator
This authenticator reads a username and a password from a credential file.
- `path`: Path to the credential file. (Required)
The credential file has exactly two lines (trailing newline optional). The first
line starts with `username=` and contains the username, the second line starts
with `password=` and contains the password. The username and password may
contain any characters except a line break.
```
username=AzureDiamond
password=hunter2
```
### The `keyring` authenticator
This authenticator uses the system keyring to store passwords. The username can
@ -230,23 +192,6 @@ is stored in the keyring.
- `keyring_name`: The service name PFERD uses for storing credentials. (Default:
`PFERD`)
### The `pass` authenticator
This authenticator queries the [`pass` password manager][3] for a username and
password. It tries to be mostly compatible with [browserpass][4] and
[passff][5], so see those links for an overview of the format. If PFERD fails
to load your password, you can use the `--explain` flag to see why.
- `passname`: The name of the password to use (Required)
- `username_prefixes`: A comma-separated list of username line prefixes
(Default: `login,username,user`)
- `password_prefixes`: A comma-separated list of password line prefixes
(Default: `password,pass,secret`)
[3]: <https://www.passwordstore.org/> "Pass: The Standard Unix Password Manager"
[4]: <https://github.com/browserpass/browserpass-extension#organizing-password-store> "Organizing password store"
[5]: <https://github.com/passff/passff#multi-line-format> "Multi-line format"
### The `tfa` authenticator
This authenticator prompts the user on the console for a two-factor
@ -258,94 +203,63 @@ This authenticator does not support usernames.
Transformation rules are rules for renaming and excluding files and directories.
They are specified line-by-line in a crawler's `transform` option. When a
crawler needs to apply a rule to a path, it goes through this list top-to-bottom
and applies the first matching rule.
and choose the first matching rule.
To see this process in action, you can use the `--debug-transforms` or flag or
the `--explain` flag.
Each rule has the format `SOURCE ARROW TARGET` (e. g. `foo/bar --> foo/baz`).
The arrow specifies how the source and target are interpreted. The different
kinds of arrows are documented below.
Each line has the format `SOURCE ARROW TARGET` where `TARGET` is optional.
`SOURCE` is either a normal path without spaces (e. g. `foo/bar`), or a string
literal delimited by `"` or `'` (e. g. `"foo\" bar/baz"`). Python's string
escape syntax is supported. Trailing slashes are ignored. `TARGET` can be
formatted like `SOURCE`, but it can also be a single exclamation mark without
quotes (`!`). `ARROW` is one of `-->`, `-name->`, `-exact->`, `-re->` and
`-name-re->`
`SOURCE` and `TARGET` are either a bunch of characters without spaces (e. g.
`foo/bar`) or string literals (e. g, `"foo/b a r"`). The former syntax has no
concept of escaping characters, so the backslash is just another character. The
string literals however support Python's escape syntax (e. g.
`"foo\\bar\tbaz"`). This also means that in string literals, backslashes must be
escaped.
`TARGET` can additionally be a single exclamation mark `!` (*not* `"!"`). When a
rule with a `!` as target matches a path, the corresponding file or directory is
ignored by the crawler instead of renamed.
`TARGET` can also be omitted entirely. When a rule without target matches a
path, the path is returned unmodified. This is useful to prevent rules further
down from matching instead.
Each arrow's behaviour can be modified slightly by changing the arrow's head
from `>` to `>>`. When a rule with a `>>` arrow head matches a path, it doesn't
return immediately like a normal arrow. Instead, it replaces the current path
with its output and continues on to the next rule. In effect, this means that
multiple rules can be applied sequentially.
If a rule's target is `!`, this means that when the rule matches on a path, the
corresponding file or directory is ignored. If a rule's target is missing, the
path is matched but not modified.
### The `-->` arrow
The `-->` arrow is a basic renaming operation for files and directories. If a
path matches `SOURCE`, it is renamed to `TARGET`.
Example: `foo/bar --> baz`
- Doesn't match `foo`, `a/foo/bar` or `foo/baz`
- Converts `foo/bar` into `baz`
- Converts `foo/bar/wargl` into `baz/wargl`
Example: `foo/bar --> !`
- Doesn't match `foo`, `a/foo/bar` or `foo/baz`
- Ignores `foo/bar` and any of its children
The `-->` arrow is a basic renaming operation. If a path begins with `SOURCE`,
that part of the path is replaced with `TARGET`. This means that the rule
`foo/bar --> baz` would convert `foo/bar` into `baz`, but also `foo/bar/xyz`
into `baz/xyz`. The rule `foo --> !` would ignore a directory named `foo` as
well as all its contents.
### The `-name->` arrow
The `-name->` arrow lets you rename files and directories by their name,
regardless of where they appear in the file tree. Because of this, its `SOURCE`
must not contain multiple path segments, only a single name. This restriction
does not apply to its `TARGET`.
does not apply to its `TARGET`. The `-name->` arrow is not applied recursively
to its own output to prevent infinite loops.
Example: `foo -name-> bar/baz`
- Doesn't match `a/foobar/b` or `x/Foo/y/z`
- Converts `hello/foo` into `hello/bar/baz`
- Converts `foo/world` into `bar/baz/world`
- Converts `a/foo/b/c/foo` into `a/bar/baz/b/c/bar/baz`
Example: `foo -name-> !`
- Doesn't match `a/foobar/b` or `x/Foo/y/z`
- Ignores any path containing a segment `foo`
For example, the rule `foo -name-> bar/baz` would convert `a/foo` into
`a/bar/baz` and `a/foo/b/c/foo` into `a/bar/baz/b/c/bar/baz`. The rule `foo
-name-> !` would ignore all directories and files named `foo`.
### The `-exact->` arrow
The `-exact->` arrow requires the path to match `SOURCE` exactly. The examples
below show why this is useful.
Example: `foo/bar -exact-> baz`
- Doesn't match `foo`, `a/foo/bar` or `foo/baz`
- Converts `foo/bar` into `baz`
- Doesn't match `foo/bar/wargl`
Example: `foo/bar -exact-> !`
- Doesn't match `foo`, `a/foo/bar` or `foo/baz`
- Ignores only `foo/bar`, not its children
The `-exact->` arrow requires the path to match `SOURCE` exactly. This means
that the rule `foo/bar -exact-> baz` would still convert `foo/bar` into `baz`,
but `foo/bar/xyz` would be unaffected. Also, `foo -exact-> !` would only ignore
`foo`, but not its contents (if it has any). The examples below show why this is
useful.
### The `-re->` arrow
The `-re->` arrow is like the `-->` arrow but with regular expressions. `SOURCE`
is a regular expression and `TARGET` an f-string based template. If a path
matches `SOURCE`, the output path is created using `TARGET` as template.
`SOURCE` is automatically anchored.
The `-re->` arrow uses regular expressions. `SOURCE` is a regular expression
that must match the entire path. If this is the case, then the capturing groups
are available in `TARGET` for formatting.
`TARGET` uses Python's [format string syntax][6]. The *n*-th capturing group can
be referred to as `{g<n>}` (e.g. `{g3}`). `{g0}` refers to the original path.
`TARGET` uses Python's [format string syntax][3]. The *n*-th capturing group can
be referred to as `{g<n>}` (e. g. `{g3}`). `{g0}` refers to the original path.
If capturing group *n*'s contents are a valid integer, the integer value is
available as `{i<n>}` (e.g. `{i3}`). If capturing group *n*'s contents are a
valid float, the float value is available as `{f<n>}` (e.g. `{f3}`). If a
capturing group is not present (e.g. when matching the string `cd` with the
available as `{i<n>}` (e. g. `{i3}`). If capturing group *n*'s contents are a
valid float, the float value is available as `{f<n>}` (e. g. `{f3}`). If a
capturing group is not present (e. g. when matching the string `cd` with the
regex `(ab)?cd`), the corresponding variables are not defined.
Python's format string syntax has rich options for formatting its arguments. For
@ -355,37 +269,18 @@ can use `{i3:05}`.
PFERD even allows you to write entire expressions inside the curly braces, for
example `{g2.lower()}` or `{g3.replace(' ', '_')}`.
Example: `f(oo+)/be?ar -re-> B{g1.upper()}H/fear`
- Doesn't match `a/foo/bar`, `foo/abc/bar`, `afoo/bar` or `foo/bars`
- Converts `foo/bar` into `BOOH/fear`
- Converts `fooooo/bear` into `BOOOOOH/fear`
- Converts `foo/bar/baz` into `BOOH/fear/baz`
[6]: <https://docs.python.org/3/library/string.html#format-string-syntax> "Format String Syntax"
[3]: <https://docs.python.org/3/library/string.html#format-string-syntax> "Format String Syntax"
### The `-name-re->` arrow
The `-name-re>` arrow is like a combination of the `-name->` and `-re->` arrows.
Instead of the `SOURCE` being the name of a directory or file, it's a regex that
is matched against the names of directories and files. `TARGET` works like the
`-re->` arrow's target.
Example: `(.*)\.jpeg -name-re-> {g1}.jpg`
- Doesn't match `foo/bar.png`, `baz.JPEG` or `hello,jpeg`
- Converts `foo/bar.jpeg` into `foo/bar.jpg`
- Converts `foo.jpeg/bar/baz.jpeg` into `foo.jpg/bar/baz.jpg`
Example: `\..+ -name-re-> !`
- Doesn't match `.`, `test`, `a.b`
- Ignores all files and directories starting with `.`.
### The `-exact-re->` arrow
The `-exact-re>` arrow is like a combination of the `-exact->` and `-re->`
arrows.
Example: `f(oo+)/be?ar -exactre-> B{g1.upper()}H/fear`
- Doesn't match `a/foo/bar`, `foo/abc/bar`, `afoo/bar` or `foo/bars`
- Converts `foo/bar` into `BOOH/fear`
- Converts `fooooo/bear` into `BOOOOOH/fear`
- Doesn't match `foo/bar/baz`
For example, the arrow `(.*)\.jpeg -name-re-> {g1}.jpg` will rename all `.jpeg`
extensions into `.jpg`. The arrow `\..+ -name-re-> !` will ignore all files and
directories starting with `.`.
### Example: Tutorials
@ -412,7 +307,8 @@ tutorials --> !
The second rule is required for many crawlers since they use the rules to decide
which directories to crawl. If it was missing when the crawler looks at
`tutorials/`, the third rule would match. This means the crawler would not crawl
the `tutorials/` directory and thus not discover that `tutorials/tut02/` exists.
the `tutorials/` directory and thus not discover that `tutorials/tut02/`
existed.
Since the second rule is only relevant for crawling, the `TARGET` is left out.
@ -437,9 +333,9 @@ To do this, you can use the most powerful of arrows: The regex arrow.
Note the escaped backslashes on the `SOURCE` side.
### Example: Crawl a Python project
### Example: Crawl a python project
You are crawling a Python project and want to ignore all hidden files (files
You are crawling a python project and want to ignore all hidden files (files
whose name starts with a `.`), all `__pycache__` directories and all markdown
files (for some weird reason).
@ -459,21 +355,11 @@ README.md
...
```
For this task, the name arrows can be used.
For this task, the name arrows can be used. They are variants of the normal
arrows that only look at the file name instead of the entire path.
```
\..* -name-re-> !
__pycache__ -name-> !
.*\.md -name-re-> !
```
### Example: Clean up names
You want to convert all paths into lowercase and replace spaces with underscores
before applying any rules. This can be achieved using the `>>` arrow heads.
```
(.*) -re->> "{g1.lower().replace(' ', '_')}"
<other rules go here>
```

View File

@ -1,6 +1,4 @@
Copyright 2019-2021 Garmelon, I-Al-Istannen, danstooamerican, pavelzw,
TheChristophe, Scriptim, thelukasprobst, Toorero,
Mr-Pine
Copyright 2019-2020 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, TheChristophe, Scriptim
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in

View File

@ -5,8 +5,7 @@ import os
import sys
from pathlib import Path
from .auth import AuthLoadError
from .cli import PARSER, ParserLoadError, load_default_section
from .cli import PARSER, load_default_section
from .config import Config, ConfigDumpError, ConfigLoadError, ConfigOptionError
from .logging import log
from .pferd import Pferd, PferdLoadError
@ -15,7 +14,7 @@ from .transformer import RuleParseError
def load_config_parser(args: argparse.Namespace) -> configparser.ConfigParser:
log.explain_topic("Loading config")
parser = configparser.ConfigParser(interpolation=None)
parser = configparser.ConfigParser()
if args.command is None:
log.explain("No CLI command specified, loading config from file")
@ -37,9 +36,6 @@ def load_config(args: argparse.Namespace) -> Config:
log.error(str(e))
log.error_contd(e.reason)
sys.exit(1)
except ParserLoadError as e:
log.error(str(e))
sys.exit(1)
def configure_logging_from_args(args: argparse.Namespace) -> None:
@ -47,8 +43,6 @@ def configure_logging_from_args(args: argparse.Namespace) -> None:
log.output_explain = args.explain
if args.status is not None:
log.output_status = args.status
if args.show_not_deleted is not None:
log.output_not_deleted = args.show_not_deleted
if args.report is not None:
log.output_report = args.report
@ -74,8 +68,6 @@ def configure_logging_from_config(args: argparse.Namespace, config: Config) -> N
log.output_status = config.default_section.status()
if args.report is None:
log.output_report = config.default_section.report()
if args.show_not_deleted is None:
log.output_not_deleted = config.default_section.show_not_deleted()
except ConfigOptionError as e:
log.error(str(e))
sys.exit(1)
@ -120,7 +112,7 @@ def main() -> None:
sys.exit()
try:
pferd = Pferd(config, args.crawler, args.skip)
pferd = Pferd(config, args.crawler)
except PferdLoadError as e:
log.unlock()
log.error(str(e))
@ -139,7 +131,7 @@ def main() -> None:
loop.close()
else:
asyncio.run(pferd.run(args.debug_transforms))
except (ConfigOptionError, AuthLoadError) as e:
except ConfigOptionError as e:
log.unlock()
log.error(str(e))
sys.exit(1)
@ -151,6 +143,7 @@ def main() -> None:
log.unlock()
log.explain_topic("Interrupted, exiting immediately")
log.explain("Open files and connections are left for the OS to clean up")
log.explain("Temporary files are not cleaned up")
pferd.print_report()
# TODO Clean up tmp files
# And when those files *do* actually get cleaned up properly,
@ -163,7 +156,3 @@ def main() -> None:
sys.exit(1)
else:
pferd.print_report()
if __name__ == "__main__":
main()

View File

@ -2,10 +2,8 @@ from configparser import SectionProxy
from typing import Callable, Dict
from ..config import Config
from .authenticator import Authenticator, AuthError, AuthLoadError, AuthSection # noqa: F401
from .credential_file import CredentialFileAuthenticator, CredentialFileAuthSection
from .authenticator import Authenticator, AuthError, AuthSection # noqa: F401
from .keyring import KeyringAuthenticator, KeyringAuthSection
from .pass_ import PassAuthenticator, PassAuthSection
from .simple import SimpleAuthenticator, SimpleAuthSection
from .tfa import TfaAuthenticator
@ -16,14 +14,10 @@ AuthConstructor = Callable[[
], Authenticator]
AUTHENTICATORS: Dict[str, AuthConstructor] = {
"credential-file": lambda n, s, c:
CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c),
"keyring": lambda n, s, c:
KeyringAuthenticator(n, KeyringAuthSection(s)),
"pass": lambda n, s, c:
PassAuthenticator(n, PassAuthSection(s)),
"simple": lambda n, s, c:
SimpleAuthenticator(n, SimpleAuthSection(s)),
"tfa": lambda n, s, c:
TfaAuthenticator(n),
"keyring": lambda n, s, c:
KeyringAuthenticator(n, KeyringAuthSection(s))
}

View File

@ -13,15 +13,14 @@ class AuthError(Exception):
class AuthSection(Section):
def type(self) -> str:
value = self.s.get("type")
if value is None:
self.missing_value("type")
return value
pass
class Authenticator(ABC):
def __init__(self, name: str) -> None:
def __init__(
self,
name: str
) -> None:
"""
Initialize an authenticator from its name and its section in the config
file.

View File

@ -1,46 +0,0 @@
from pathlib import Path
from typing import Tuple
from ..config import Config
from ..utils import fmt_real_path
from .authenticator import Authenticator, AuthLoadError, AuthSection
class CredentialFileAuthSection(AuthSection):
def path(self) -> Path:
value = self.s.get("path")
if value is None:
self.missing_value("path")
return Path(value)
class CredentialFileAuthenticator(Authenticator):
def __init__(self, name: str, section: CredentialFileAuthSection, config: Config) -> None:
super().__init__(name)
path = config.default_section.working_dir() / section.path()
try:
with open(path, encoding="utf-8") as f:
lines = list(f)
except UnicodeDecodeError:
raise AuthLoadError(f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8")
except OSError as e:
raise AuthLoadError(f"No credential file at {fmt_real_path(path)}") from e
if len(lines) != 2:
raise AuthLoadError("Credential file must be two lines long")
[uline, pline] = lines
uline = uline[:-1] # Remove trailing newline
if pline.endswith("\n"):
pline = pline[:-1]
if not uline.startswith("username="):
raise AuthLoadError("First line must start with 'username='")
if not pline.startswith("password="):
raise AuthLoadError("Second line must start with 'password='")
self._username = uline[9:]
self._password = pline[9:]
async def credentials(self) -> Tuple[str, str]:
return self._username, self._password

View File

@ -18,7 +18,11 @@ class KeyringAuthSection(AuthSection):
class KeyringAuthenticator(Authenticator):
def __init__(self, name: str, section: KeyringAuthSection) -> None:
def __init__(
self,
name: str,
section: KeyringAuthSection,
) -> None:
super().__init__(name)
self._username = section.username()

View File

@ -1,98 +0,0 @@
import re
import subprocess
from typing import List, Tuple
from ..logging import log
from .authenticator import Authenticator, AuthError, AuthSection
class PassAuthSection(AuthSection):
def passname(self) -> str:
if (value := self.s.get("passname")) is None:
self.missing_value("passname")
return value
def username_prefixes(self) -> List[str]:
value = self.s.get("username_prefixes", "login,username,user")
return [prefix.lower() for prefix in value.split(",")]
def password_prefixes(self) -> List[str]:
value = self.s.get("password_prefixes", "password,pass,secret")
return [prefix.lower() for prefix in value.split(",")]
class PassAuthenticator(Authenticator):
PREFIXED_LINE_RE = r"([a-zA-Z]+):\s?(.*)" # to be used with fullmatch
def __init__(self, name: str, section: PassAuthSection) -> None:
super().__init__(name)
self._passname = section.passname()
self._username_prefixes = section.username_prefixes()
self._password_prefixes = section.password_prefixes()
async def credentials(self) -> Tuple[str, str]:
log.explain_topic("Obtaining credentials from pass")
try:
log.explain(f"Calling 'pass show {self._passname}'")
result = subprocess.check_output(["pass", "show", self._passname], text=True)
except subprocess.CalledProcessError as e:
raise AuthError(f"Failed to get password info from {self._passname}: {e}")
prefixed = {}
unprefixed = []
for line in result.strip().splitlines():
if match := re.fullmatch(self.PREFIXED_LINE_RE, line):
prefix = match.group(1).lower()
value = match.group(2)
log.explain(f"Found prefixed line {line!r} with prefix {prefix!r}, value {value!r}")
if prefix in prefixed:
raise AuthError(f"Prefix {prefix} specified multiple times")
prefixed[prefix] = value
else:
log.explain(f"Found unprefixed line {line!r}")
unprefixed.append(line)
username = None
for prefix in self._username_prefixes:
log.explain(f"Looking for username at prefix {prefix!r}")
if prefix in prefixed:
username = prefixed[prefix]
log.explain(f"Found username {username!r}")
break
password = None
for prefix in self._password_prefixes:
log.explain(f"Looking for password at prefix {prefix!r}")
if prefix in prefixed:
password = prefixed[prefix]
log.explain(f"Found password {password!r}")
break
if password is None and username is None:
log.explain("No username and password found so far")
log.explain("Using first unprefixed line as password")
log.explain("Using second unprefixed line as username")
elif password is None:
log.explain("No password found so far")
log.explain("Using first unprefixed line as password")
elif username is None:
log.explain("No username found so far")
log.explain("Using first unprefixed line as username")
if password is None:
if not unprefixed:
log.explain("Not enough unprefixed lines left")
raise AuthError("Password could not be determined")
password = unprefixed.pop(0)
log.explain(f"Found password {password!r}")
if username is None:
if not unprefixed:
log.explain("Not enough unprefixed lines left")
raise AuthError("Username could not be determined")
username = unprefixed.pop(0)
log.explain(f"Found username {username!r}")
return username, password

View File

@ -14,7 +14,11 @@ class SimpleAuthSection(AuthSection):
class SimpleAuthenticator(Authenticator):
def __init__(self, name: str, section: SimpleAuthSection) -> None:
def __init__(
self,
name: str,
section: SimpleAuthSection,
) -> None:
super().__init__(name)
self._username = section.username()

View File

@ -6,7 +6,10 @@ from .authenticator import Authenticator, AuthError
class TfaAuthenticator(Authenticator):
def __init__(self, name: str) -> None:
def __init__(
self,
name: str,
) -> None:
super().__init__(name)
async def username(self) -> str:

View File

@ -1,13 +1,11 @@
# isort: skip_file
# The order of imports matters because each command module registers itself
# with the parser from ".parser" and the import order affects the order in
# which they appear in the help. Because of this, isort is disabled for this
# with the parser from ".parser". Because of this, isort is disabled for this
# file. Also, since we're reexporting or just using the side effect of
# importing itself, we get a few linting warnings, which we're disabling as
# well.
from . import command_local # noqa: F401 imported but unused
from . import command_kit_ilias_web # noqa: F401 imported but unused
from . import command_kit_ipd # noqa: F401 imported but unused
from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused
from .parser import PARSER, load_default_section # noqa: F401 imported but unused

View File

@ -4,8 +4,7 @@ from pathlib import Path
from ..crawl.ilias.file_templates import Links
from ..logging import log
from .parser import (CRAWLER_PARSER, SUBPARSERS, BooleanOptionalAction, ParserLoadError, load_crawler,
show_value_error)
from .parser import CRAWLER_PARSER, SUBPARSERS, BooleanOptionalAction, load_crawler, show_value_error
SUBPARSER = SUBPARSERS.add_parser(
"kit-ilias-web",
@ -39,12 +38,6 @@ GROUP.add_argument(
action=BooleanOptionalAction,
help="use the system keyring to store and retrieve passwords"
)
GROUP.add_argument(
"--credential-file",
type=Path,
metavar="PATH",
help="read username and password from a credential file"
)
GROUP.add_argument(
"--links",
type=show_value_error(Links.from_string),
@ -62,11 +55,6 @@ GROUP.add_argument(
action=BooleanOptionalAction,
help="crawl and download videos"
)
GROUP.add_argument(
"--forums",
action=BooleanOptionalAction,
help="crawl and download forum posts"
)
GROUP.add_argument(
"--http-timeout", "-t",
type=float,
@ -95,26 +83,16 @@ def load(
section["link_redirect_delay"] = str(args.link_redirect_delay)
if args.videos is not None:
section["videos"] = "yes" if args.videos else "no"
if args.forums is not None:
section["forums"] = "yes" if args.forums else "no"
if args.http_timeout is not None:
section["http_timeout"] = str(args.http_timeout)
parser["auth:ilias"] = {}
auth_section = parser["auth:ilias"]
if args.credential_file is not None:
if args.username is not None:
raise ParserLoadError("--credential-file and --username can't be used together")
if args.keyring:
raise ParserLoadError("--credential-file and --keyring can't be used together")
auth_section["type"] = "credential-file"
auth_section["path"] = str(args.credential_file)
elif args.keyring:
auth_section["type"] = "keyring"
else:
auth_section["type"] = "simple"
if args.username is not None:
auth_section["username"] = args.username
if args.keyring:
auth_section["type"] = "keyring"
SUBPARSER.set_defaults(command=load)

View File

@ -1,54 +0,0 @@
import argparse
import configparser
from pathlib import Path
from ..logging import log
from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler
SUBPARSER = SUBPARSERS.add_parser(
"kit-ipd",
parents=[CRAWLER_PARSER],
)
GROUP = SUBPARSER.add_argument_group(
title="kit ipd crawler arguments",
description="arguments for the 'kit-ipd' crawler",
)
GROUP.add_argument(
"--link-regex",
type=str,
metavar="REGEX",
help="href-matching regex to identify downloadable files"
)
GROUP.add_argument(
"target",
type=str,
metavar="TARGET",
help="url to crawl"
)
GROUP.add_argument(
"output",
type=Path,
metavar="OUTPUT",
help="output directory"
)
def load(
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
log.explain("Creating config for command 'kit-ipd'")
parser["crawl:kit-ipd"] = {}
section = parser["crawl:kit-ipd"]
load_crawler(args, section)
section["type"] = "kit-ipd"
section["target"] = str(args.target)
section["output_dir"] = str(args.output)
if args.link_regex:
section["link_regex"] = str(args.link_regex)
SUBPARSER.set_defaults(command=load)

View File

@ -8,10 +8,6 @@ from ..output_dir import OnConflict, Redownload
from ..version import NAME, VERSION
class ParserLoadError(Exception):
pass
# TODO Replace with argparse version when updating to 3.9?
class BooleanOptionalAction(argparse.Action):
def __init__(
@ -181,14 +177,6 @@ PARSER.add_argument(
help="only execute a single crawler."
" Can be specified multiple times to execute multiple crawlers"
)
PARSER.add_argument(
"--skip", "-S",
action="append",
type=str,
metavar="NAME",
help="don't execute this particular crawler."
" Can be specified multiple times to skip multiple crawlers"
)
PARSER.add_argument(
"--working-dir",
type=Path,
@ -215,11 +203,6 @@ PARSER.add_argument(
action=BooleanOptionalAction,
help="whether crawlers should share cookies where applicable"
)
PARSER.add_argument(
"--show-not-deleted",
action=BooleanOptionalAction,
help="print messages in status and report when PFERD did not delete a local only file"
)
def load_default_section(
@ -238,8 +221,6 @@ def load_default_section(
section["report"] = "yes" if args.report else "no"
if args.share_cookies is not None:
section["share_cookies"] = "yes" if args.share_cookies else "no"
if args.show_not_deleted is not None:
section["show_not_deleted"] = "yes" if args.show_not_deleted else "no"
SUBPARSERS = PARSER.add_subparsers(title="crawlers")

View File

@ -69,7 +69,6 @@ class Section:
class DefaultSection(Section):
def working_dir(self) -> Path:
# TODO Change to working dir instead of manually prepending it to paths
pathstr = self.s.get("working_dir", ".")
return Path(pathstr).expanduser()
@ -82,9 +81,6 @@ class DefaultSection(Section):
def report(self) -> bool:
return self.s.getboolean("report", fallback=True)
def show_not_deleted(self) -> bool:
return self.s.getboolean("show_not_deleted", fallback=True)
def share_cookies(self) -> bool:
return self.s.getboolean("share_cookies", fallback=True)
@ -123,7 +119,7 @@ class Config:
# Using config.read_file instead of config.read because config.read
# would just ignore a missing file and carry on.
try:
with open(path, encoding="utf-8") as f:
with open(path) as f:
parser.read_file(f, source=str(path))
except FileNotFoundError:
raise ConfigLoadError(path, "File does not exist")
@ -131,8 +127,6 @@ class Config:
raise ConfigLoadError(path, "That's a directory, not a file")
except PermissionError:
raise ConfigLoadError(path, "Insufficient permissions")
except UnicodeDecodeError:
raise ConfigLoadError(path, "File is not encoded using UTF-8")
def dump(self, path: Optional[Path] = None) -> None:
"""
@ -159,12 +153,12 @@ class Config:
try:
# x = open for exclusive creation, failing if the file already
# exists
with open(path, "x", encoding="utf-8") as f:
with open(path, "x") as f:
self._parser.write(f)
except FileExistsError:
print("That file already exists.")
if asyncio.run(prompt_yes_no("Overwrite it?", default=False)):
with open(path, "w", encoding="utf-8") as f:
with open(path, "w") as f:
self._parser.write(f)
else:
raise ConfigDumpError(path, "File already exists")

View File

@ -3,9 +3,8 @@ from typing import Callable, Dict
from ..auth import Authenticator
from ..config import Config
from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401
from .crawler import Crawler, CrawlError # noqa: F401
from .ilias import KitIliasWebCrawler, KitIliasWebCrawlerSection
from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection
from .local_crawler import LocalCrawler, LocalCrawlerSection
CrawlerConstructor = Callable[[
@ -20,6 +19,4 @@ CRAWLERS: Dict[str, CrawlerConstructor] = {
LocalCrawler(n, LocalCrawlerSection(s), c),
"kit-ilias-web": lambda n, s, c, a:
KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a),
"kit-ipd": lambda n, s, c, a:
KitIpdCrawler(n, KitIpdCrawlerSection(s), c),
}

View File

@ -1,10 +1,9 @@
import asyncio
import os
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Coroutine
from datetime import datetime
from pathlib import Path, PurePath
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, TypeVar
from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence, Set, Tuple, TypeVar
from ..auth import Authenticator
from ..config import Config, Section
@ -48,18 +47,16 @@ def noncritical(f: Wrapped) -> Wrapped:
try:
f(*args, **kwargs)
except (CrawlWarning, OutputDirError, MarkDuplicateError, MarkConflictError) as e:
crawler.report.add_warning(str(e))
log.warn(str(e))
crawler.error_free = False
except Exception as e:
except: # noqa: E722 do not use bare 'except'
crawler.error_free = False
crawler.report.add_error(str(e))
raise
return wrapper # type: ignore
AWrapped = TypeVar("AWrapped", bound=Callable[..., Coroutine[Any, Any, Optional[Any]]])
AWrapped = TypeVar("AWrapped", bound=Callable[..., Awaitable[None]])
def anoncritical(f: AWrapped) -> AWrapped:
@ -75,25 +72,21 @@ def anoncritical(f: AWrapped) -> AWrapped:
Warning: Must only be applied to member functions of the Crawler class!
"""
async def wrapper(*args: Any, **kwargs: Any) -> Optional[Any]:
async def wrapper(*args: Any, **kwargs: Any) -> None:
if not (args and isinstance(args[0], Crawler)):
raise RuntimeError("@anoncritical must only applied to Crawler methods")
crawler = args[0]
try:
return await f(*args, **kwargs)
await f(*args, **kwargs)
except (CrawlWarning, OutputDirError, MarkDuplicateError, MarkConflictError) as e:
log.warn(str(e))
crawler.error_free = False
crawler.report.add_warning(str(e))
except Exception as e:
except: # noqa: E722 do not use bare 'except'
crawler.error_free = False
crawler.report.add_error(str(e))
raise
return None
return wrapper # type: ignore
@ -139,15 +132,6 @@ class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
class CrawlerSection(Section):
def type(self) -> str:
value = self.s.get("type")
if value is None:
self.missing_value("type")
return value
def skip(self) -> bool:
return self.s.getboolean("skip", fallback=False)
def output_dir(self, name: str) -> Path:
# TODO Use removeprefix() after switching to 3.9
if name.startswith("crawl:"):
@ -325,7 +309,6 @@ class Crawler(ABC):
log.explain("Warnings or errors occurred during this run")
log.explain("Answer: No")
@anoncritical
async def run(self) -> None:
"""
Start the crawling process. Call this function if you want to use a

View File

@ -1,8 +1,7 @@
import asyncio
import http.cookies
import ssl
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional
from typing import Dict, List, Optional
import aiohttp
import certifi
@ -106,25 +105,6 @@ class HttpCrawler(Crawler):
self._shared_cookie_jar_paths.append(self._cookie_jar_path)
def _load_cookies_from_file(self, path: Path) -> None:
jar: Any = http.cookies.SimpleCookie()
with open(path, encoding="utf-8") as f:
for i, line in enumerate(f):
# Names of headers are case insensitive
if line[:11].lower() == "set-cookie:":
jar.load(line[11:])
else:
log.explain(f"Line {i} doesn't start with 'Set-Cookie:', ignoring it")
self._cookie_jar.update_cookies(jar)
def _save_cookies_to_file(self, path: Path) -> None:
jar: Any = http.cookies.SimpleCookie()
for morsel in self._cookie_jar:
jar[morsel.key] = morsel
with open(path, "w", encoding="utf-8") as f:
f.write(jar.output(sep="\n"))
f.write("\n") # A trailing newline is just common courtesy
def _load_cookies(self) -> None:
log.explain_topic("Loading cookies")
@ -154,7 +134,7 @@ class HttpCrawler(Crawler):
log.explain(f"Loading cookies from {fmt_real_path(cookie_jar_path)}")
try:
self._load_cookies_from_file(cookie_jar_path)
self._cookie_jar.load(cookie_jar_path)
except Exception as e:
log.explain("Failed to load cookies")
log.explain(str(e))
@ -164,7 +144,7 @@ class HttpCrawler(Crawler):
try:
log.explain(f"Saving cookies to {fmt_real_path(self._cookie_jar_path)}")
self._save_cookies_to_file(self._cookie_jar_path)
self._cookie_jar.save(self._cookie_jar_path)
except Exception as e:
log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}")
log.warn(str(e))

View File

@ -1,10 +1,6 @@
from enum import Enum
from typing import Optional
import bs4
from PFERD.utils import soupify
_link_template_plain = "{{link}}"
_link_template_fancy = """
<!DOCTYPE html>
@ -98,71 +94,6 @@ _link_template_internet_shortcut = """
URL={{link}}
""".strip()
_learning_module_template = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>{{name}}</title>
</head>
<style>
* {
box-sizing: border-box;
}
.center-flex {
display: flex;
align-items: center;
justify-content: center;
}
.nav {
display: flex;
justify-content: space-between;
}
</style>
<body class="center-flex">
{{body}}
</body>
</html>
"""
def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str:
# Seems to be comments, ignore those.
for elem in body.select(".il-copg-mob-fullscreen-modal"):
elem.decompose()
nav_template = """
<div class="nav">
{{left}}
{{right}}
</div>
"""
if prev and body.select_one(".ilc_page_lnav_LeftNavigation"):
text = body.select_one(".ilc_page_lnav_LeftNavigation").getText().strip()
left = f'<a href="{prev}">{text}</a>'
else:
left = "<span></span>"
if next and body.select_one(".ilc_page_rnav_RightNavigation"):
text = body.select_one(".ilc_page_rnav_RightNavigation").getText().strip()
right = f'<a href="{next}">{text}</a>'
else:
right = "<span></span>"
if top_nav := body.select_one(".ilc_page_tnav_TopNavigation"):
top_nav.replace_with(
soupify(nav_template.replace("{{left}}", left).replace("{{right}}", right).encode())
)
if bot_nav := body.select_one(".ilc_page_bnav_BottomNavigation"):
bot_nav.replace_with(soupify(nav_template.replace(
"{{left}}", left).replace("{{right}}", right).encode())
)
body = body.prettify()
return _learning_module_template.replace("{{body}}", body).replace("{{name}}", name)
class Links(Enum):
IGNORE = "ignore"
@ -171,24 +102,24 @@ class Links(Enum):
INTERNET_SHORTCUT = "internet-shortcut"
def template(self) -> Optional[str]:
if self == Links.FANCY:
if self == self.FANCY:
return _link_template_fancy
elif self == Links.PLAINTEXT:
elif self == self.PLAINTEXT:
return _link_template_plain
elif self == Links.INTERNET_SHORTCUT:
elif self == self.INTERNET_SHORTCUT:
return _link_template_internet_shortcut
elif self == Links.IGNORE:
elif self == self.IGNORE:
return None
raise ValueError("Missing switch case")
def extension(self) -> Optional[str]:
if self == Links.FANCY:
if self == self.FANCY:
return ".html"
elif self == Links.PLAINTEXT:
elif self == self.PLAINTEXT:
return ".txt"
elif self == Links.INTERNET_SHORTCUT:
elif self == self.INTERNET_SHORTCUT:
return ".url"
elif self == Links.IGNORE:
elif self == self.IGNORE:
return None
raise ValueError("Missing switch case")

View File

@ -1,91 +0,0 @@
from bs4 import BeautifulSoup, Comment, Tag
_STYLE_TAG_CONTENT = """
.ilc_text_block_Information {
background-color: #f5f7fa;
}
div.ilc_text_block_Standard {
margin-bottom: 10px;
margin-top: 10px;
}
span.ilc_text_inline_Strong {
font-weight: bold;
}
.accordion-head {
background-color: #f5f7fa;
padding: 0.5rem 0;
}
h3 {
margin-top: 0.5rem;
margin-bottom: 1rem;
}
br.visible-break {
margin-bottom: 1rem;
}
article {
margin: 0.5rem 0;
}
body {
padding: 1em;
grid-template-columns: 1fr min(60rem, 90%) 1fr;
line-height: 1.2;
}
"""
_ARTICLE_WORTHY_CLASSES = [
"ilc_text_block_Information",
"ilc_section_Attention",
"ilc_section_Link",
]
def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup:
head = soup.new_tag("head")
soup.insert(0, head)
simplecss_link: Tag = soup.new_tag("link")
# <link rel="stylesheet" href="https://cdn.simplecss.org/simple.css">
simplecss_link["rel"] = "stylesheet"
simplecss_link["href"] = "https://cdn.simplecss.org/simple.css"
head.append(simplecss_link)
# Basic style tags for compat
style: Tag = soup.new_tag("style")
style.append(_STYLE_TAG_CONTENT)
head.append(style)
return soup
def clean(soup: BeautifulSoup) -> BeautifulSoup:
for block in soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES):
block.name = "article"
for block in soup.find_all("h3"):
block.name = "div"
for block in soup.find_all("h1"):
block.name = "h3"
for block in soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap"):
block.name = "h3"
block["class"] += ["accordion-head"]
for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"):
children = list(dummy.children)
if not children:
dummy.decompose()
if len(children) > 1:
continue
if isinstance(type(children[0]), Comment):
dummy.decompose()
for hrule_imposter in soup.find_all(class_="ilc_section_Separator"):
hrule_imposter.insert(0, soup.new_tag("hr"))
return soup

View File

@ -3,7 +3,7 @@ import re
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional, Union, cast
from typing import List, Optional, Union
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup, Tag
@ -22,18 +22,11 @@ class IliasElementType(Enum):
FOLDER = "folder"
FORUM = "forum"
LINK = "link"
INFO_TAB = "info_tab"
LEARNING_MODULE = "learning_module"
BOOKING = "booking"
MEETING = "meeting"
SURVEY = "survey"
SCORM_LEARNING_MODULE = "scorm_learning_module"
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder"
MEDIACAST_VIDEO = "mediacast_video"
OPENCAST_VIDEO = "opencast_video"
OPENCAST_VIDEO_PLAYER = "opencast_video_player"
OPENCAST_VIDEO_FOLDER = "opencast_video_folder"
OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated"
VIDEO = "video"
VIDEO_PLAYER = "video_player"
VIDEO_FOLDER = "video_folder"
VIDEO_FOLDER_MAYBE_PAGINATED = "video_folder_maybe_paginated"
@dataclass
@ -44,46 +37,6 @@ class IliasPageElement:
mtime: Optional[datetime] = None
description: Optional[str] = None
def id(self) -> str:
regexes = [
r"eid=(?P<id>[0-9a-z\-]+)",
r"file_(?P<id>\d+)",
r"ref_id=(?P<id>\d+)",
r"target=[a-z]+_(?P<id>\d+)",
r"mm_(?P<id>\d+)"
]
for regex in regexes:
if match := re.search(regex, self.url):
return match.groupdict()["id"]
# Fall back to URL
log.warn(f"Didn't find identity for {self.name} - {self.url}. Please report this.")
return self.url
@dataclass
class IliasDownloadForumData:
url: str
form_data: Dict[str, Union[str, List[str]]]
empty: bool
@dataclass
class IliasForumThread:
title: str
title_tag: Tag
content_tag: Tag
mtime: Optional[datetime]
@dataclass
class IliasLearningModulePage:
title: str
content: Tag
next_url: Optional[str]
previous_url: Optional[str]
class IliasPage:
@ -93,16 +46,6 @@ class IliasPage:
self._page_type = source_element.type if source_element else None
self._source_name = source_element.name if source_element else ""
@staticmethod
def is_root_page(soup: BeautifulSoup) -> bool:
permalink = soup.find(id="current_perma_link")
if permalink is None:
return False
value = permalink.attrs.get("value")
if value is None:
return False
return "goto.php?target=root_" in value
def get_child_elements(self) -> List[IliasPageElement]:
"""
Return all child page elements you can find here.
@ -110,141 +53,24 @@ class IliasPage:
if self._is_video_player():
log.explain("Page is a video player, extracting URL")
return self._player_to_video()
if self._is_opencast_video_listing():
log.explain("Page is an opencast video listing, searching for elements")
return self._find_opencast_video_entries()
if self._is_video_listing():
log.explain("Page is a video listing, searching for elements")
return self._find_video_entries()
if self._is_exercise_file():
log.explain("Page is an exercise, searching for elements")
return self._find_exercise_entries()
if self._is_personal_desktop():
log.explain("Page is the personal desktop, searching for elements")
return self._find_personal_desktop_entries()
if self._is_content_page():
log.explain("Page is a content page, searching for elements")
return self._find_copa_entries()
if self._is_info_tab():
log.explain("Page is info tab, searching for elements")
return self._find_info_tab_entries()
log.explain("Page is a normal folder, searching for elements")
return self._find_normal_entries()
def get_info_tab(self) -> Optional[IliasPageElement]:
tab: Optional[Tag] = self._soup.find(
name="a",
attrs={"href": lambda x: x and "cmdClass=ilinfoscreengui" in x}
)
if tab is not None:
return IliasPageElement(
IliasElementType.INFO_TAB,
self._abs_url_from_link(tab),
"infos"
)
return None
def get_description(self) -> Optional[BeautifulSoup]:
def is_interesting_class(name: str) -> bool:
return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"]
paragraphs: List[Tag] = self._soup.findAll(class_=is_interesting_class)
if not paragraphs:
return None
# Extract bits and pieces into a string and parse it again.
# This ensures we don't miss anything and weird structures are resolved
# somewhat gracefully.
raw_html = ""
for p in paragraphs:
if p.find_parent(class_=is_interesting_class):
continue
# Ignore special listings (like folder groupings)
if "ilc_section_Special" in p["class"]:
continue
raw_html += str(p) + "\n"
raw_html = f"<body>\n{raw_html}\n</body>"
return BeautifulSoup(raw_html, "html.parser")
def get_learning_module_data(self) -> Optional[IliasLearningModulePage]:
if not self._is_learning_module_page():
return None
content = self._soup.select_one("#ilLMPageContent")
title = self._soup.select_one(".ilc_page_title_PageTitle").getText().strip()
return IliasLearningModulePage(
title=title,
content=content,
next_url=self._find_learning_module_next(),
previous_url=self._find_learning_module_prev()
)
def _find_learning_module_next(self) -> Optional[str]:
for link in self._soup.select("a.ilc_page_rnavlink_RightNavigationLink"):
url = self._abs_url_from_link(link)
if "baseClass=ilLMPresentationGUI" not in url:
continue
return url
return None
def _find_learning_module_prev(self) -> Optional[str]:
for link in self._soup.select("a.ilc_page_lnavlink_LeftNavigationLink"):
url = self._abs_url_from_link(link)
if "baseClass=ilLMPresentationGUI" not in url:
continue
return url
return None
def get_download_forum_data(self) -> Optional[IliasDownloadForumData]:
form = self._soup.find("form", attrs={"action": lambda x: x and "fallbackCmd=showThreads" in x})
if not form:
return None
post_url = self._abs_url_from_relative(form["action"])
thread_ids = [f["value"] for f in form.find_all(attrs={"name": "thread_ids[]"})]
form_data: Dict[str, Union[str, List[str]]] = {
"thread_ids[]": thread_ids,
"selected_cmd2": "html",
"select_cmd2": "Ausführen",
"selected_cmd": "",
}
return IliasDownloadForumData(url=post_url, form_data=form_data, empty=len(thread_ids) == 0)
def get_next_stage_element(self) -> Optional[IliasPageElement]:
if self._is_forum_page():
if "trows=800" in self._page_url:
return None
log.explain("Requesting *all* forum threads")
return self._get_show_max_forum_entries_per_page_url()
def get_next_stage_url(self) -> Optional[str]:
if self._is_ilias_opencast_embedding():
log.explain("Unwrapping opencast embedding")
return self.get_child_elements()[0]
if self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED:
log.explain("Unwrapping video pagination")
return self._find_opencast_video_entries_paginated()[0]
if self._contains_collapsed_future_meetings():
log.explain("Requesting *all* future meetings")
return self._uncollapse_future_meetings_url()
if not self._is_content_tab_selected():
if self._page_type != IliasElementType.INFO_TAB:
log.explain("Selecting content tab")
return self._select_content_page_url()
else:
log.explain("Crawling info tab, skipping content select")
return self.get_child_elements()[0].url
return None
def _is_forum_page(self) -> bool:
read_more_btn = self._soup.find(
"button",
attrs={"onclick": lambda x: x and "cmdClass=ilobjforumgui&cmd=markAllRead" in x}
)
return read_more_btn is not None
def _is_video_player(self) -> bool:
return "paella_config_file" in str(self._soup)
def _is_opencast_video_listing(self) -> bool:
def _is_video_listing(self) -> bool:
if self._is_ilias_opencast_embedding():
return True
@ -275,66 +101,13 @@ class IliasPage:
return False
def _is_personal_desktop(self) -> bool:
return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x})
def _is_content_page(self) -> bool:
link = self._soup.find(id="current_perma_link")
if not link:
return False
return "target=copa_" in link.get("value")
def _is_learning_module_page(self) -> bool:
link = self._soup.find(id="current_perma_link")
if not link:
return False
return "target=pg_" in link.get("value")
def _contains_collapsed_future_meetings(self) -> bool:
return self._uncollapse_future_meetings_url() is not None
def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]:
element = self._soup.find(
"a",
attrs={"href": lambda x: x and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)}
)
if not element:
return None
link = self._abs_url_from_link(element)
return IliasPageElement(IliasElementType.FOLDER, link, "show all meetings")
def _is_content_tab_selected(self) -> bool:
return self._select_content_page_url() is None
def _is_info_tab(self) -> bool:
might_be_info = self._soup.find("form", attrs={"name": lambda x: x == "formInfoScreen"}) is not None
return self._page_type == IliasElementType.INFO_TAB and might_be_info
def _select_content_page_url(self) -> Optional[IliasPageElement]:
tab = self._soup.find(
id="tab_view_content",
attrs={"class": lambda x: x is not None and "active" not in x}
)
# Already selected (or not found)
if not tab:
return None
link = tab.find("a")
if link:
link = self._abs_url_from_link(link)
return IliasPageElement(IliasElementType.FOLDER, link, "select content page")
_unexpected_html_warning()
log.warn_contd(f"Could not find content tab URL on {self._page_url!r}.")
log.warn_contd("PFERD might not find content on the course's main page.")
return None
def _player_to_video(self) -> List[IliasPageElement]:
# Fetch the actual video page. This is a small wrapper page initializing a javscript
# player. Sadly we can not execute that JS. The actual video stream url is nowhere
# on the page, but defined in a JS object inside a script tag, passed to the player
# library.
# We do the impossible and RegEx the stream JSON object out of the page's HTML source
regex = re.compile(
regex: re.Pattern[str] = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
)
json_match = regex.search(str(self._soup))
@ -346,96 +119,11 @@ class IliasPage:
# parse it
json_object = json.loads(json_str)
streams = [stream for stream in json_object["streams"]]
# and fetch the video url!
video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"]
return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)]
# and just fetch the lone video url!
if len(streams) == 1:
video_url = streams[0]["sources"]["mp4"][0]["src"]
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, self._source_name)]
log.explain(f"Found multiple videos for stream at {self._source_name}")
items = []
for stream in sorted(streams, key=lambda stream: stream["content"]):
full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4"
video_url = stream["sources"]["mp4"][0]["src"]
items.append(IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, full_name))
return items
def _get_show_max_forum_entries_per_page_url(self) -> Optional[IliasPageElement]:
correct_link = self._soup.find(
"a",
attrs={"href": lambda x: x and "trows=800" in x and "cmd=showThreads" in x}
)
if not correct_link:
return None
link = self._abs_url_from_link(correct_link)
return IliasPageElement(IliasElementType.FORUM, link, "show all forum threads")
def _find_personal_desktop_entries(self) -> List[IliasPageElement]:
items: List[IliasPageElement] = []
titles: List[Tag] = self._soup.select(".il-item-title")
for title in titles:
link = title.find("a")
name = _sanitize_path_name(link.text.strip())
url = self._abs_url_from_link(link)
type = self._find_type_from_link(name, link, url)
if not type:
_unexpected_html_warning()
log.warn_contd(f"Could not extract type for {link}")
continue
log.explain(f"Found {name!r}")
if type == IliasElementType.FILE and "_download" not in url:
url = re.sub(r"(target=file_\d+)", r"\1_download", url)
log.explain("Rewired file URL to include download part")
items.append(IliasPageElement(type, url, name))
return items
def _find_copa_entries(self) -> List[IliasPageElement]:
items: List[IliasPageElement] = []
links: List[Tag] = self._soup.findAll(class_="ilc_flist_a_FileListItemLink")
for link in links:
url = self._abs_url_from_link(link)
name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.getText()).strip().replace("\t", "")
name = _sanitize_path_name(name)
if "file_id" not in url:
_unexpected_html_warning()
log.warn_contd(f"Found unknown content page item {name!r} with url {url!r}")
continue
items.append(IliasPageElement(IliasElementType.FILE, url, name))
return items
def _find_info_tab_entries(self) -> List[IliasPageElement]:
items = []
links: List[Tag] = self._soup.select("a.il_ContainerItemCommand")
for link in links:
if "cmdClass=ilobjcoursegui" not in link["href"]:
continue
if "cmd=sendfile" not in link["href"]:
continue
items.append(IliasPageElement(
IliasElementType.FILE,
self._abs_url_from_link(link),
_sanitize_path_name(link.getText())
))
return items
def _find_opencast_video_entries(self) -> List[IliasPageElement]:
def _find_video_entries(self) -> List[IliasPageElement]:
# ILIAS has three stages for video pages
# 1. The initial dummy page without any videos. This page contains the link to the listing
# 2. The video listing which might be paginated
@ -455,27 +143,27 @@ class IliasPage:
query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
url = url_set_query_params(url, query_params)
log.explain("Found ILIAS video frame page, fetching actual content next")
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, url, "")]
return [IliasPageElement(IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED, url, "")]
is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None
if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER:
if is_paginated and not self._page_type == IliasElementType.VIDEO_FOLDER:
# We are in stage 2 - try to break pagination
return self._find_opencast_video_entries_paginated()
return self._find_video_entries_paginated()
return self._find_opencast_video_entries_no_paging()
return self._find_video_entries_no_paging()
def _find_opencast_video_entries_paginated(self) -> List[IliasPageElement]:
def _find_video_entries_paginated(self) -> List[IliasPageElement]:
table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+"))
if table_element is None:
log.warn("Couldn't increase elements per page (table not found). I might miss elements.")
return self._find_opencast_video_entries_no_paging()
return self._find_video_entries_no_paging()
id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"])
if id_match is None:
log.warn("Couldn't increase elements per page (table id not found). I might miss elements.")
return self._find_opencast_video_entries_no_paging()
return self._find_video_entries_no_paging()
table_id = id_match.group(1)
@ -484,42 +172,31 @@ class IliasPage:
url = url_set_query_params(self._page_url, query_params)
log.explain("Disabled pagination, retrying folder as a new entry")
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")]
return [IliasPageElement(IliasElementType.VIDEO_FOLDER, url, "")]
def _find_opencast_video_entries_no_paging(self) -> List[IliasPageElement]:
def _find_video_entries_no_paging(self) -> List[IliasPageElement]:
"""
Crawls the "second stage" video page. This page contains the actual video urls.
"""
# Video start links are marked with an "Abspielen" link
video_links: List[Tag] = self._soup.findAll(
name="a", text=re.compile(r"\s*(Abspielen|Play)\s*")
name="a", text=re.compile(r"\s*Abspielen\s*")
)
results: List[IliasPageElement] = []
for link in video_links:
results.append(self._listed_opencast_video_to_element(link))
results.append(self._listed_video_to_element(link))
return results
def _listed_opencast_video_to_element(self, link: Tag) -> IliasPageElement:
def _listed_video_to_element(self, link: Tag) -> IliasPageElement:
# The link is part of a table with multiple columns, describing metadata.
# 6th or 7th child (1 indexed) is the modification time string. Try to find it
# by parsing backwards from the end and finding something that looks like a date
modification_time = None
row: Tag = link.parent.parent.parent
column_count = len(row.select("td.std"))
for index in range(column_count, 0, -1):
# 6th child (1 indexed) is the modification time string
modification_string = link.parent.parent.parent.select_one(
f"td.std:nth-child({index})"
"td.std:nth-child(6)"
).getText().strip()
if re.search(r"\d+\.\d+.\d+ - \d+:\d+", modification_string):
modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M")
break
if modification_time is None:
log.warn(f"Could not determine upload time for {link}")
modification_time = datetime.now()
title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip()
title += ".mp4"
@ -529,9 +206,7 @@ class IliasPage:
video_url = self._abs_url_from_link(link)
log.explain(f"Found video {video_name!r} at {video_url}")
return IliasPageElement(
IliasElementType.OPENCAST_VIDEO_PLAYER, video_url, video_name, modification_time
)
return IliasPageElement(IliasElementType.VIDEO_PLAYER, video_url, video_name, modification_time)
def _find_exercise_entries(self) -> List[IliasPageElement]:
if self._soup.find(id="tab_submission"):
@ -555,16 +230,12 @@ class IliasPage:
parent_row: Tag = link.findParent("tr")
children: List[Tag] = parent_row.findChildren("td")
# <checkbox> <name> <uploader> <date> <download>
# 0 1 2 3 4
name = _sanitize_path_name(children[1].getText().strip())
date = demangle_date(children[3].getText().strip())
log.explain(f"Found exercise detail entry {name!r}")
for child in reversed(children):
date = demangle_date(child.getText().strip(), fail_silently=True)
if date is not None:
break
if date is None:
log.warn(f"Date parsing failed for exercise entry {name!r}")
results.append(IliasPageElement(
IliasElementType.FILE,
self._abs_url_from_link(link),
@ -618,13 +289,7 @@ class IliasPage:
# Add each listing as a new
for listing in file_listings:
parent_container: Tag = listing.findParent(
"div", attrs={"class": lambda x: x and "form-group" in x}
)
label_container: Tag = parent_container.find(
attrs={"class": lambda x: x and "control-label" in x}
)
file_name = _sanitize_path_name(label_container.getText().strip())
file_name = _sanitize_path_name(listing.getText().strip())
url = self._abs_url_from_link(listing)
log.explain(f"Found exercise detail {file_name!r} at {url}")
results.append(IliasPageElement(
@ -654,12 +319,6 @@ class IliasPage:
element_type = self._find_type_from_link(element_name, link, abs_url)
description = self._find_link_description(link)
# The last meeting on every page is expanded by default.
# Its content is then shown inline *and* in the meeting page itself.
# We should skip the inline content.
if element_type != IliasElementType.MEETING and self._is_in_expanded_meeting(link):
continue
if not element_type:
continue
if element_type == IliasElementType.MEETING:
@ -673,69 +332,8 @@ class IliasPage:
log.explain(f"Found {element_name!r}")
result.append(IliasPageElement(element_type, abs_url, element_name, description=description))
result += self._find_cards()
result += self._find_mediacast_videos()
return result
def _find_mediacast_videos(self) -> List[IliasPageElement]:
videos: List[IliasPageElement] = []
for elem in cast(List[Tag], self._soup.select(".ilPlayerPreviewOverlayOuter")):
element_name = _sanitize_path_name(
elem.select_one(".ilPlayerPreviewDescription").getText().strip()
)
if not element_name.endswith(".mp4"):
# just to make sure it has some kinda-alrightish ending
element_name = element_name + ".mp4"
video_element = elem.find(name="video")
if not video_element:
_unexpected_html_warning()
log.warn_contd(f"No <video> element found for mediacast video '{element_name}'")
continue
videos.append(IliasPageElement(
type=IliasElementType.MEDIACAST_VIDEO,
url=self._abs_url_from_relative(video_element.get("src")),
name=element_name,
mtime=self._find_mediacast_video_mtime(elem.findParent(name="td"))
))
return videos
def _find_mediacast_video_mtime(self, enclosing_td: Tag) -> Optional[datetime]:
description_td: Tag = enclosing_td.findPreviousSibling("td")
if not description_td:
return None
meta_tag: Tag = description_td.find_all("p")[-1]
if not meta_tag:
return None
updated_str = meta_tag.getText().strip().replace("\n", " ")
updated_str = re.sub(".+?: ", "", updated_str)
return demangle_date(updated_str)
def _is_in_expanded_meeting(self, tag: Tag) -> bool:
"""
Returns whether a file is part of an expanded meeting.
Has false positives for meetings themselves as their title is also "in the expanded meeting content".
It is in the same general div and this whole thing is guesswork.
Therefore, you should check for meetings before passing them in this function.
"""
parents: List[Tag] = list(tag.parents)
for parent in parents:
if not parent.get("class"):
continue
# We should not crawl files under meetings
if "ilContainerListItemContentCB" in parent.get("class"):
link: Tag = parent.parent.find("a")
type = IliasPage._find_type_from_folder_like(link, self._page_url)
return type == IliasElementType.MEETING
return False
def _find_upwards_folder_hierarchy(self, tag: Tag) -> List[str]:
"""
Interprets accordions and expandable blocks as virtual folders and returns them
@ -761,10 +359,7 @@ class IliasPage:
continue
prev: Tag = parent.findPreviousSibling("div")
if "ilContainerBlockHeader" in prev.get("class"):
if prev.find("h3"):
found_titles.append(prev.find("h3").getText().strip())
else:
found_titles.append(prev.find("h2").getText().strip())
# And this for real accordions
if "il_VAccordionContentDef" in parent.get("class"):
@ -819,102 +414,6 @@ class IliasPage:
log.explain(f"Found file {full_path!r}")
return IliasPageElement(IliasElementType.FILE, url, full_path, modification_date)
def _find_cards(self) -> List[IliasPageElement]:
result: List[IliasPageElement] = []
card_titles: List[Tag] = self._soup.select(".card-title a")
for title in card_titles:
url = self._abs_url_from_link(title)
name = _sanitize_path_name(title.getText().strip())
type = self._find_type_from_card(title)
if not type:
_unexpected_html_warning()
log.warn_contd(f"Could not extract type for {title}")
continue
result.append(IliasPageElement(type, url, name))
card_button_tiles: List[Tag] = self._soup.select(".card-title button")
for button in card_button_tiles:
regex = re.compile(button["id"] + r".*window.open\(['\"](.+?)['\"]")
res = regex.search(str(self._soup))
if not res:
_unexpected_html_warning()
log.warn_contd(f"Could not find click handler target for {button}")
continue
url = self._abs_url_from_relative(res.group(1))
name = _sanitize_path_name(button.getText().strip())
type = self._find_type_from_card(button)
caption_parent = button.findParent(
"div",
attrs={"class": lambda x: x and "caption" in x},
)
caption_container = caption_parent.find_next_sibling("div")
if caption_container:
description = caption_container.getText().strip()
else:
description = None
if not type:
_unexpected_html_warning()
log.warn_contd(f"Could not extract type for {button}")
continue
result.append(IliasPageElement(type, url, name, description=description))
return result
def _find_type_from_card(self, card_title: Tag) -> Optional[IliasElementType]:
def is_card_root(element: Tag) -> bool:
return "il-card" in element["class"] and "thumbnail" in element["class"]
card_root: Optional[Tag] = None
# We look for the card root
for parent in card_title.parents:
if is_card_root(parent):
card_root = parent
break
if card_root is None:
_unexpected_html_warning()
log.warn_contd(f"Tried to figure out element type, but did not find an icon for {card_title}")
return None
icon: Tag = card_root.select_one(".il-card-repository-head .icon")
if "opencast" in icon["class"] or "xoct" in icon["class"]:
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
if "exc" in icon["class"]:
return IliasElementType.EXERCISE
if "webr" in icon["class"]:
return IliasElementType.LINK
if "book" in icon["class"]:
return IliasElementType.BOOKING
if "frm" in icon["class"]:
return IliasElementType.FORUM
if "sess" in icon["class"]:
return IliasElementType.MEETING
if "tst" in icon["class"]:
return IliasElementType.TEST
if "fold" in icon["class"]:
return IliasElementType.FOLDER
if "copa" in icon["class"]:
return IliasElementType.FOLDER
if "svy" in icon["class"]:
return IliasElementType.SURVEY
if "file" in icon["class"]:
return IliasElementType.FILE
if "mcst" in icon["class"]:
return IliasElementType.MEDIACAST_VIDEO_FOLDER
_unexpected_html_warning()
log.warn_contd(f"Could not extract type from {icon} for card title {card_title}")
return None
@staticmethod
def _find_type_from_link(
element_name: str,
@ -930,39 +429,9 @@ class IliasPage:
if "target=file_" in parsed_url.query:
return IliasElementType.FILE
if "target=grp_" in parsed_url.query:
return IliasElementType.FOLDER
if "target=crs_" in parsed_url.query:
return IliasElementType.FOLDER
if "baseClass=ilExerciseHandlerGUI" in parsed_url.query:
return IliasElementType.EXERCISE
if "baseClass=ilLinkResourceHandlerGUI" in parsed_url.query and "calldirectlink" in parsed_url.query:
return IliasElementType.LINK
if "cmd=showThreads" in parsed_url.query or "target=frm_" in parsed_url.query:
return IliasElementType.FORUM
if "cmdClass=ilobjtestgui" in parsed_url.query:
return IliasElementType.TEST
if "baseClass=ilLMPresentationGUI" in parsed_url.query:
return IliasElementType.LEARNING_MODULE
if "baseClass=ilMediaCastHandlerGUI" in parsed_url.query:
return IliasElementType.MEDIACAST_VIDEO_FOLDER
if "baseClass=ilSAHSPresentationGUI" in parsed_url.query:
return IliasElementType.SCORM_LEARNING_MODULE
# Booking and Meeting can not be detected based on the link. They do have a ref_id though, so
# try to guess it from the image.
# Everything with a ref_id can *probably* be opened to reveal nested things
# video groups, directories, exercises, etc
if "ref_id=" in parsed_url.query or "goto.php" in parsed_url.path:
if "ref_id=" in parsed_url.query:
return IliasPage._find_type_from_folder_like(link_element, url)
_unexpected_html_warning()
@ -983,7 +452,7 @@ class IliasPage:
# We look for the outer div of our inner link, to find information around it
# (mostly the icon)
for parent in link_element.parents:
if "ilContainerListItemOuter" in parent["class"] or "il-std-item" in parent["class"]:
if "ilContainerListItemOuter" in parent["class"]:
found_parent = parent
break
@ -995,24 +464,13 @@ class IliasPage:
# Find the small descriptive icon to figure out the type
img_tag: Optional[Tag] = found_parent.select_one("img.ilListItemIcon")
if img_tag is None:
img_tag = found_parent.select_one("img.icon")
is_session_expansion_button = found_parent.find(
"a",
attrs={"href": lambda x: x and ("crs_next_sess=" in x or "crs_prev_sess=" in x)}
)
if img_tag is None and is_session_expansion_button:
log.explain("Found session expansion button, skipping it as it has no content")
return None
if img_tag is None:
_unexpected_html_warning()
log.warn_contd(f"Tried to figure out element type, but did not find an image for {url}")
return None
if "opencast" in str(img_tag["alt"]).lower():
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
return IliasElementType.VIDEO_FOLDER
if str(img_tag["src"]).endswith("icon_exc.svg"):
return IliasElementType.EXERCISE
@ -1020,9 +478,6 @@ class IliasPage:
if str(img_tag["src"]).endswith("icon_webr.svg"):
return IliasElementType.LINK
if str(img_tag["src"]).endswith("icon_book.svg"):
return IliasElementType.BOOKING
if str(img_tag["src"]).endswith("frm.svg"):
return IliasElementType.FORUM
@ -1032,12 +487,6 @@ class IliasPage:
if str(img_tag["src"]).endswith("icon_tst.svg"):
return IliasElementType.TEST
if str(img_tag["src"]).endswith("icon_mcst.svg"):
return IliasElementType.MEDIACAST_VIDEO_FOLDER
if str(img_tag["src"]).endswith("icon_sahs.svg"):
return IliasElementType.SCORM_LEARNING_MODULE
return IliasElementType.FOLDER
@staticmethod
@ -1046,66 +495,23 @@ class IliasPage:
Normalizes meeting names, which have a relative time as their first part,
to their date in ISO format.
"""
# This checks whether we can reach a `:` without passing a `-`
if re.search(r"^[^-]+: ", meeting_name):
# Meeting name only contains date: "05. Jan 2000:"
split_delimiter = ":"
else:
# Meeting name contains date and start/end times: "05. Jan 2000, 16:00 - 17:30:"
split_delimiter = ", "
# We have a meeting day without time
date_portion_str = meeting_name.split(split_delimiter)[0]
date_portion_str = meeting_name.split(" - ")[0]
date_portion = demangle_date(date_portion_str)
# We failed to parse the date, bail out
if not date_portion:
return meeting_name
# Replace the first section with the absolute date
rest_of_name = split_delimiter.join(meeting_name.split(split_delimiter)[1:])
return datetime.strftime(date_portion, "%Y-%m-%d") + split_delimiter + rest_of_name
rest_of_name = meeting_name
if rest_of_name.startswith(date_portion_str):
rest_of_name = rest_of_name[len(date_portion_str):]
@staticmethod
def is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False
return datetime.strftime(date_portion, "%Y-%m-%d, %H:%M") + rest_of_name
def _abs_url_from_link(self, link_tag: Tag) -> str:
"""
Create an absolute url from an <a> tag.
"""
return self._abs_url_from_relative(link_tag.get("href"))
def _abs_url_from_relative(self, relative_url: str) -> str:
"""
Create an absolute url from a relative URL.
"""
return urljoin(self._page_url, relative_url)
return urljoin(self._page_url, link_tag.get("href"))
def _unexpected_html_warning() -> None:
@ -1116,51 +522,38 @@ german_months = ['Jan', 'Feb', 'Mär', 'Apr', 'Mai', 'Jun', 'Jul', 'Aug', 'Sep',
english_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
def demangle_date(date_str: str, fail_silently: bool = False) -> Optional[datetime]:
def demangle_date(date_str: str) -> Optional[datetime]:
"""
Demangle a given date in one of the following formats (hour/minute part is optional):
Demangle a given date in one of the following formats:
"Gestern, HH:MM"
"Heute, HH:MM"
"Morgen, HH:MM"
"dd. mon yyyy, HH:MM
"""
try:
# Normalize whitespace because users
date_str = re.sub(r"\s+", " ", date_str)
date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I)
date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I)
date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I)
date_str = date_str.strip()
for german, english in zip(german_months, english_months):
date_str = date_str.replace(german, english)
# Remove trailing dots for abbreviations, e.g. "20. Apr. 2020" -> "20. Apr 2020"
date_str = date_str.replace(english + ".", english)
# We now have a nice english String in the format: "dd. mmm yyyy, hh:mm" or "dd. mmm yyyy"
# Check if we have a time as well
if ", " in date_str:
# We now have a nice english String in the format: "dd. mmm yyyy, hh:mm"
day_part, time_part = date_str.split(",")
else:
day_part = date_str.split(",")[0]
time_part = None
day_str, month_str, year_str = day_part.split(" ")
day = int(day_str.strip().replace(".", ""))
month = english_months.index(month_str.strip()) + 1
year = int(year_str.strip())
if time_part:
hour_str, minute_str = time_part.split(":")
hour = int(hour_str)
minute = int(minute_str)
return datetime(year, month, day, hour, minute)
return datetime(year, month, day)
return datetime(year, month, day, hour, minute)
except Exception:
if not fail_silently:
log.warn(f"Date parsing failed for {date_str!r}")
return None
@ -1180,45 +573,3 @@ def _tomorrow() -> date:
def _sanitize_path_name(name: str) -> str:
return name.replace("/", "-").replace("\\", "-").strip()
def parse_ilias_forum_export(forum_export: BeautifulSoup) -> List[IliasForumThread]:
elements = []
for p in forum_export.select("body > p"):
title_tag = p
content_tag = p.find_next_sibling("ul")
if not content_tag:
# ILIAS allows users to delete the initial post while keeping the thread open
# This produces empty threads without *any* content.
# I am not sure why you would want this, but ILIAS makes it easy to do.
continue
title = p.find("b").text
if ":" in title:
title = title[title.find(":") + 1:]
title = title.strip()
mtime = _guess_timestamp_from_forum_post_content(content_tag)
elements.append(IliasForumThread(title, title_tag, content_tag, mtime))
return elements
def _guess_timestamp_from_forum_post_content(content: Tag) -> Optional[datetime]:
posts: Optional[Tag] = content.select(".ilFrmPostHeader > span.small")
if not posts:
return None
newest_date: Optional[datetime] = None
for post in posts:
text = post.text.strip()
text = text[text.rfind("|") + 1:]
date = demangle_date(text, fail_silently=True)
if not date:
continue
if not newest_date or newest_date < date:
newest_date = date
return newest_date

File diff suppressed because it is too large Load Diff

View File

@ -1,170 +0,0 @@
import os
import re
from dataclasses import dataclass
from pathlib import PurePath
from typing import Awaitable, List, Optional, Pattern, Set, Tuple, Union
from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag
from ..config import Config
from ..logging import ProgressBar, log
from ..output_dir import FileSink
from ..utils import soupify
from .crawler import CrawlError
from .http_crawler import HttpCrawler, HttpCrawlerSection
class KitIpdCrawlerSection(HttpCrawlerSection):
def target(self) -> str:
target = self.s.get("target")
if not target:
self.missing_value("target")
if not target.startswith("https://"):
self.invalid_value("target", target, "Should be a URL")
return target
def link_regex(self) -> Pattern[str]:
regex = self.s.get("link_regex", r"^.*?[^/]+\.(pdf|zip|c|cpp|java)$")
return re.compile(regex)
@dataclass(unsafe_hash=True)
class KitIpdFile:
name: str
url: str
@dataclass
class KitIpdFolder:
name: str
files: List[KitIpdFile]
def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}")
for file in self.files:
log.explain(f"File {file.name!r} (href={file.url!r})")
def __hash__(self) -> int:
return self.name.__hash__()
class KitIpdCrawler(HttpCrawler):
def __init__(
self,
name: str,
section: KitIpdCrawlerSection,
config: Config,
):
super().__init__(name, section, config)
self._url = section.target()
self._file_regex = section.link_regex()
async def _run(self) -> None:
maybe_cl = await self.crawl(PurePath("."))
if not maybe_cl:
return
tasks: List[Awaitable[None]] = []
async with maybe_cl:
for item in await self._fetch_items():
if isinstance(item, KitIpdFolder):
tasks.append(self._crawl_folder(item))
else:
# Orphan files are placed in the root folder
tasks.append(self._download_file(PurePath("."), item))
await self.gather(tasks)
async def _crawl_folder(self, folder: KitIpdFolder) -> None:
path = PurePath(folder.name)
if not await self.crawl(path):
return
tasks = [self._download_file(path, file) for file in folder.files]
await self.gather(tasks)
async def _download_file(self, parent: PurePath, file: KitIpdFile) -> None:
element_path = parent / file.name
maybe_dl = await self.download(element_path)
if not maybe_dl:
return
async with maybe_dl as (bar, sink):
await self._stream_from_url(file.url, sink, bar)
async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]:
page, url = await self.get_page()
elements: List[Tag] = self._find_file_links(page)
items: Set[Union[KitIpdFile, KitIpdFolder]] = set()
for element in elements:
folder_label = self._find_folder_label(element)
if folder_label:
folder = self._extract_folder(folder_label, url)
if folder not in items:
items.add(folder)
folder.explain()
else:
file = self._extract_file(element, url)
items.add(file)
log.explain_topic(f"Orphan file {file.name!r} (href={file.url!r})")
log.explain("Attributing it to root folder")
return items
def _extract_folder(self, folder_tag: Tag, url: str) -> KitIpdFolder:
files: List[KitIpdFile] = []
name = folder_tag.getText().strip()
container: Tag = folder_tag.findNextSibling(name="table")
for link in self._find_file_links(container):
files.append(self._extract_file(link, url))
return KitIpdFolder(name, files)
@staticmethod
def _find_folder_label(file_link: Tag) -> Optional[Tag]:
enclosing_table: Tag = file_link.findParent(name="table")
if enclosing_table is None:
return None
return enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$"))
def _extract_file(self, link: Tag, url: str) -> KitIpdFile:
url = self._abs_url_from_link(url, link)
name = os.path.basename(url)
return KitIpdFile(name, url)
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> List[Tag]:
return tag.findAll(name="a", attrs={"href": self._file_regex})
def _abs_url_from_link(self, url: str, link_tag: Tag) -> str:
return urljoin(url, link_tag.get("href"))
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp:
if resp.status == 403:
raise CrawlError("Received a 403. Are you within the KIT network/VPN?")
if resp.content_length:
bar.set_total(resp.content_length)
async for data in resp.content.iter_chunked(1024):
sink.file.write(data)
bar.advance(len(data))
sink.done()
async def get_page(self) -> Tuple[BeautifulSoup, str]:
async with self.session.get(self._url) as request:
# The web page for Algorithmen für Routenplanung contains some
# weird comments that beautifulsoup doesn't parse correctly. This
# hack enables those pages to be crawled, and should hopefully not
# cause issues on other pages.
content = (await request.read()).decode("utf-8")
content = re.sub(r"<!--.*?-->", "", content)
return soupify(content.encode("utf-8")), str(request.url)

View File

@ -14,7 +14,7 @@ def name_variants(path: PurePath) -> Iterator[PurePath]:
class Deduplicator:
FORBIDDEN_CHARS = '<>:"/\\|?*' + "".join([chr(i) for i in range(0, 32)])
FORBIDDEN_CHARS = '<>:"/\\|?*'
FORBIDDEN_NAMES = {
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
@ -56,12 +56,6 @@ class Deduplicator:
log.explain(f"Changed path to {fmt_path(new_path)} for windows compatibility")
return new_path
def fixup_path(self, path: PurePath) -> PurePath:
"""Fixes up the path for windows, if enabled. Returns the path unchanged otherwise."""
if self._windows_paths:
return self._fixup_for_windows(path)
return path
def mark(self, path: PurePath) -> PurePath:
if self._windows_paths:
path = self._fixup_for_windows(path)

View File

@ -5,7 +5,7 @@ from contextlib import asynccontextmanager, contextmanager
# TODO In Python 3.9 and above, ContextManager is deprecated
from typing import AsyncIterator, ContextManager, Iterator, List, Optional
from rich.console import Console, Group
from rich.console import Console, RenderGroup
from rich.live import Live
from rich.markup import escape
from rich.panel import Panel
@ -59,7 +59,6 @@ class Log:
# Whether different parts of the output are enabled or disabled
self.output_explain = False
self.output_status = True
self.output_not_deleted = True
self.output_report = True
def _update_live(self) -> None:
@ -69,7 +68,7 @@ class Log:
if self._download_progress.task_ids:
elements.append(self._download_progress)
group = Group(*elements)
group = RenderGroup(*elements) # type: ignore
self._live.update(group)
@contextmanager
@ -198,7 +197,7 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
if self.output_explain:
self.print(f" {escape(text)}")
def status(self, style: str, action: str, text: str, suffix: str = "") -> None:
def status(self, style: str, action: str, text: str) -> None:
"""
Print a status update while crawling. Allows markup in the "style"
argument which will be applied to the "action" string.
@ -206,18 +205,7 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
if self.output_status:
action = escape(f"{action:<{self.STATUS_WIDTH}}")
self.print(f"{style}{action}[/] {escape(text)} {suffix}")
def not_deleted(self, style: str, action: str, text: str, suffix: str = "") -> None:
"""
Print a message for a local only file that wasn't
deleted while crawling. Allows markup in the "style"
argument which will be applied to the "action" string.
"""
if self.output_status and self.output_not_deleted:
action = escape(f"{action:<{self.STATUS_WIDTH}}")
self.print(f"{style}{action}[/] {escape(text)} {suffix}")
self.print(f"{style}{action}[/] {escape(text)}")
def report(self, text: str) -> None:
"""
@ -227,14 +215,6 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
if self.output_report:
self.print(text)
def report_not_deleted(self, text: str) -> None:
"""
Print a report for a local only file that wasn't deleted after crawling. Allows markup.
"""
if self.output_report and self.output_not_deleted:
self.print(text)
@contextmanager
def _bar(
self,

View File

@ -44,7 +44,6 @@ class OnConflict(Enum):
LOCAL_FIRST = "local-first"
REMOTE_FIRST = "remote-first"
NO_DELETE = "no-delete"
NO_DELETE_PROMPT_OVERWRITE = "no-delete-prompt-overwrite"
@staticmethod
def from_string(string: str) -> "OnConflict":
@ -52,7 +51,7 @@ class OnConflict(Enum):
return OnConflict(string)
except ValueError:
raise ValueError("must be one of 'prompt', 'local-first',"
" 'remote-first', 'no-delete', 'no-delete-prompt-overwrite'")
" 'remote-first', 'no-delete'")
@dataclass
@ -232,10 +231,7 @@ class OutputDirectory:
stat = local_path.stat()
remote_newer = None
# Python on Windows crashes when faced with timestamps around the unix epoch
if heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970):
mtime = heuristics.mtime
if mtime := heuristics.mtime:
remote_newer = mtime.timestamp() > stat.st_mtime
if remote_newer:
log.explain("Remote file seems to be newer")
@ -265,7 +261,7 @@ class OutputDirectory:
on_conflict: OnConflict,
path: PurePath,
) -> bool:
if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
if on_conflict == OnConflict.PROMPT:
async with log.exclusive_output():
prompt = f"Replace {fmt_path(path)} with remote file?"
return await prompt_yes_no(prompt, default=False)
@ -284,7 +280,7 @@ class OutputDirectory:
on_conflict: OnConflict,
path: PurePath,
) -> bool:
if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
if on_conflict == OnConflict.PROMPT:
async with log.exclusive_output():
prompt = f"Recursively delete {fmt_path(path)} and replace with remote file?"
return await prompt_yes_no(prompt, default=False)
@ -304,7 +300,7 @@ class OutputDirectory:
path: PurePath,
parent: PurePath,
) -> bool:
if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
if on_conflict == OnConflict.PROMPT:
async with log.exclusive_output():
prompt = f"Delete {fmt_path(parent)} so remote file {fmt_path(path)} can be downloaded?"
return await prompt_yes_no(prompt, default=False)
@ -331,7 +327,7 @@ class OutputDirectory:
return False
elif on_conflict == OnConflict.REMOTE_FIRST:
return True
elif on_conflict in {OnConflict.NO_DELETE, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
elif on_conflict == OnConflict.NO_DELETE:
return False
# This should never be reached
@ -496,7 +492,7 @@ class OutputDirectory:
except OSError:
pass
else:
log.not_deleted("[bold bright_magenta]", "Not deleted", fmt_path(pure))
log.status("[bold bright_magenta]", "Not deleted", fmt_path(pure))
self._report.not_delete_file(pure)
def load_prev_report(self) -> None:
@ -504,7 +500,7 @@ class OutputDirectory:
try:
self._prev_report = Report.load(self._report_path)
log.explain("Loaded report successfully")
except (OSError, UnicodeDecodeError, json.JSONDecodeError, ReportLoadError) as e:
except (OSError, json.JSONDecodeError, ReportLoadError) as e:
log.explain("Failed to load report")
log.explain(str(e))

View File

@ -3,9 +3,9 @@ from typing import Dict, List, Optional
from rich.markup import escape
from .auth import AUTHENTICATORS, Authenticator, AuthError, AuthSection
from .auth import AUTHENTICATORS, Authenticator, AuthError
from .config import Config, ConfigOptionError
from .crawl import CRAWLERS, Crawler, CrawlError, CrawlerSection, KitIliasWebCrawler
from .crawl import CRAWLERS, Crawler, CrawlError, KitIliasWebCrawler
from .logging import log
from .utils import fmt_path
@ -15,33 +15,30 @@ class PferdLoadError(Exception):
class Pferd:
def __init__(self, config: Config, cli_crawlers: Optional[List[str]], cli_skips: Optional[List[str]]):
def __init__(self, config: Config, cli_crawlers: Optional[List[str]]):
"""
May throw PferdLoadError.
"""
self._config = config
self._crawlers_to_run = self._find_crawlers_to_run(config, cli_crawlers, cli_skips)
self._crawlers_to_run = self._find_crawlers_to_run(config, cli_crawlers)
self._authenticators: Dict[str, Authenticator] = {}
self._crawlers: Dict[str, Crawler] = {}
def _find_config_crawlers(self, config: Config) -> List[str]:
crawl_sections = []
for name, section in config.crawl_sections():
if CrawlerSection(section).skip():
log.explain(f"Skipping {name!r}")
else:
crawl_sections.append(name)
def _find_crawlers_to_run(self, config: Config, cli_crawlers: Optional[List[str]]) -> List[str]:
log.explain_topic("Deciding which crawlers to run")
crawl_sections = [name for name, _ in config.crawl_sections()]
if cli_crawlers is None:
log.explain("No crawlers specified on CLI")
log.explain("Running all crawlers specified in config")
return crawl_sections
def _find_cli_crawlers(self, config: Config, cli_crawlers: List[str]) -> List[str]:
if len(cli_crawlers) != len(set(cli_crawlers)):
raise PferdLoadError("Some crawlers were selected multiple times")
crawl_sections = [name for name, _ in config.crawl_sections()]
log.explain("Crawlers specified on CLI")
crawlers_to_run = [] # With crawl: prefix
unknown_names = [] # Without crawl: prefix
@ -65,36 +62,10 @@ class Pferd:
return crawlers_to_run
def _find_crawlers_to_run(
self,
config: Config,
cli_crawlers: Optional[List[str]],
cli_skips: Optional[List[str]],
) -> List[str]:
log.explain_topic("Deciding which crawlers to run")
crawlers: List[str]
if cli_crawlers is None:
log.explain("No crawlers specified on CLI")
log.explain("Running crawlers specified in config")
crawlers = self._find_config_crawlers(config)
else:
log.explain("Crawlers specified on CLI")
crawlers = self._find_cli_crawlers(config, cli_crawlers)
skips = {f"crawl:{name}" for name in cli_skips} if cli_skips else set()
for crawler in crawlers:
if crawler in skips:
log.explain(f"Skipping crawler {crawler!r}")
crawlers = [crawler for crawler in crawlers if crawler not in skips]
return crawlers
def _load_authenticators(self) -> None:
for name, section in self._config.auth_sections():
log.print(f"[bold bright_cyan]Loading[/] {escape(name)}")
auth_type = AuthSection(section).type()
auth_type = section.get("type")
authenticator_constructor = AUTHENTICATORS.get(auth_type)
if authenticator_constructor is None:
raise ConfigOptionError(name, "type", f"Unknown authenticator type: {auth_type!r}")
@ -109,7 +80,7 @@ class Pferd:
for name, section in self._config.crawl_sections():
log.print(f"[bold bright_cyan]Loading[/] {escape(name)}")
crawl_type = CrawlerSection(section).type()
crawl_type = section.get("type")
crawler_constructor = CRAWLERS.get(crawl_type)
if crawler_constructor is None:
raise ConfigOptionError(name, "type", f"Unknown crawler type: {crawl_type!r}")
@ -180,15 +151,7 @@ class Pferd:
log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}")
for path in sorted(crawler.report.not_deleted_files):
something_changed = True
log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
for warning in crawler.report.encountered_warnings:
something_changed = True
log.report(f" [bold bright_red]Warning[/] {warning}")
for error in crawler.report.encountered_errors:
something_changed = True
log.report(f" [bold bright_red]Error[/] {error}")
log.report(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
if not something_changed:
log.report(" Nothing changed")

View File

@ -1,6 +1,6 @@
import json
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional, Set
from typing import Any, Dict, List, Set
class ReportLoadError(Exception):
@ -68,13 +68,6 @@ class Report:
# Files that should have been deleted by the cleanup but weren't
self.not_deleted_files: Set[PurePath] = set()
# Custom crawler-specific data
self.custom: Dict[str, Any] = dict()
# Encountered errors and warnings
self.encountered_warnings: List[str] = []
self.encountered_errors: List[str] = []
@staticmethod
def _get_list_of_strs(data: Dict[str, Any], key: str) -> List[str]:
result: Any = data.get(key, [])
@ -88,22 +81,13 @@ class Report:
return result
@staticmethod
def _get_str_dictionary(data: Dict[str, Any], key: str) -> Dict[str, Any]:
result: Dict[str, Any] = data.get(key, {})
if not isinstance(result, dict):
raise ReportLoadError(f"Incorrect format: {key!r} is not a dictionary")
return result
@classmethod
def load(cls, path: Path) -> "Report":
"""
May raise OSError, UnicodeDecodeError, JsonDecodeError, ReportLoadError.
May raise OSError, JsonDecodeError, ReportLoadError.
"""
with open(path, encoding="utf-8") as f:
with open(path) as f:
data = json.load(f)
if not isinstance(data, dict):
@ -124,9 +108,6 @@ class Report:
self.delete_file(PurePath(elem))
for elem in self._get_list_of_strs(data, "not_deleted"):
self.not_delete_file(PurePath(elem))
self.custom = self._get_str_dictionary(data, "custom")
self.encountered_errors = self._get_list_of_strs(data, "encountered_errors")
self.encountered_warnings = self._get_list_of_strs(data, "encountered_warnings")
return self
@ -143,12 +124,9 @@ class Report:
"changed": [str(path) for path in sorted(self.changed_files)],
"deleted": [str(path) for path in sorted(self.deleted_files)],
"not_deleted": [str(path) for path in sorted(self.not_deleted_files)],
"custom": self.custom,
"encountered_warnings": self.encountered_warnings,
"encountered_errors": self.encountered_errors,
}
with open(path, "w", encoding="utf-8") as f:
with open(path, "w") as f:
json.dump(data, f, indent=2, sort_keys=True)
f.write("\n") # json.dump doesn't do this
@ -212,27 +190,3 @@ class Report:
"""
self.not_deleted_files.add(path)
def add_custom_value(self, key: str, value: Any) -> None:
"""
Adds a custom value under the passed key, overwriting any existing
"""
self.custom[key] = value
def get_custom_value(self, key: str) -> Optional[Any]:
"""
Retrieves a custom value for the given key.
"""
return self.custom.get(key)
def add_error(self, error: str) -> None:
"""
Adds an error to this report's error list.
"""
self.encountered_errors.append(error)
def add_warning(self, warning: str) -> None:
"""
Adds a warning to this report's warning list.
"""
self.encountered_warnings.append(warning)

View File

@ -1,166 +1,151 @@
# I'm sorry that this code has become a bit dense and unreadable. While
# reading, it is important to remember what True and False mean. I'd love to
# have some proper sum-types for the inputs and outputs, they'd make this code
# a lot easier to understand.
import ast
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from pathlib import PurePath
from typing import Callable, Dict, List, Optional, Sequence, TypeVar, Union
from typing import Dict, Optional, Sequence, Union
from .logging import log
from .utils import fmt_path, str_path
from .utils import fmt_path
class ArrowHead(Enum):
NORMAL = 0
SEQUENCE = 1
class Ignore:
pass
class Empty:
pass
RightSide = Union[str, Ignore, Empty]
@dataclass
class Transformed:
path: PurePath
class Ignored:
pass
TransformResult = Optional[Union[Transformed, Ignored]]
@dataclass
class Rule:
left: str
left_index: int
name: str
head: ArrowHead
right: RightSide
right_index: int
def right_result(self, path: PurePath) -> Union[str, Transformed, Ignored]:
if isinstance(self.right, str):
return self.right
elif isinstance(self.right, Ignore):
return Ignored()
elif isinstance(self.right, Empty):
return Transformed(path)
else:
raise RuntimeError(f"Right side has invalid type {type(self.right)}")
class Transformation(ABC):
def __init__(self, rule: Rule):
self.rule = rule
class Rule(ABC):
@abstractmethod
def transform(self, path: PurePath) -> TransformResult:
def transform(self, path: PurePath) -> Union[PurePath, bool]:
"""
Try to apply this rule to the path. Returns another path if the rule
was successfully applied, True if the rule matched but resulted in an
exclamation mark, and False if the rule didn't match at all.
"""
pass
class ExactTf(Transformation):
def transform(self, path: PurePath) -> TransformResult:
if path != PurePath(self.rule.left):
# These rules all use a Union[T, bool] for their right side. They are passed a
# T if the arrow's right side was a normal string, True if it was an
# exclamation mark and False if it was missing entirely.
class NormalRule(Rule):
def __init__(self, left: PurePath, right: Union[PurePath, bool]):
self._left = left
self._right = right
def _match_prefix(self, path: PurePath) -> Optional[PurePath]:
left_parts = list(reversed(self._left.parts))
path_parts = list(reversed(path.parts))
if len(left_parts) > len(path_parts):
return None
right = self.rule.right_result(path)
if not isinstance(right, str):
return right
while left_parts and path_parts:
left_part = left_parts.pop()
path_part = path_parts.pop()
return Transformed(PurePath(right))
class ExactReTf(Transformation):
def transform(self, path: PurePath) -> TransformResult:
match = re.fullmatch(self.rule.left, str_path(path))
if not match:
if left_part != path_part:
return None
right = self.rule.right_result(path)
if not isinstance(right, str):
return right
if left_parts:
return None
# For some reason, mypy thinks that "groups" has type List[str]. But
# since elements of "match.groups()" can be None, mypy is wrong.
path_parts.reverse()
return PurePath(*path_parts)
def transform(self, path: PurePath) -> Union[PurePath, bool]:
if rest := self._match_prefix(path):
if isinstance(self._right, bool):
return self._right or path
else:
return self._right / rest
return False
class ExactRule(Rule):
def __init__(self, left: PurePath, right: Union[PurePath, bool]):
self._left = left
self._right = right
def transform(self, path: PurePath) -> Union[PurePath, bool]:
if path == self._left:
if isinstance(self._right, bool):
return self._right or path
else:
return self._right
return False
class NameRule(Rule):
def __init__(self, subrule: Rule):
self._subrule = subrule
def transform(self, path: PurePath) -> Union[PurePath, bool]:
matched = False
result = PurePath()
for part in path.parts:
part_result = self._subrule.transform(PurePath(part))
if isinstance(part_result, PurePath):
matched = True
result /= part_result
elif part_result:
# If any subrule call ignores its path segment, the entire path
# should be ignored
return True
else:
# The subrule doesn't modify this segment, but maybe other
# segments
result /= part
if matched:
return result
else:
# The subrule has modified no segments, so this name version of it
# doesn't match
return False
class ReRule(Rule):
def __init__(self, left: str, right: Union[str, bool]):
self._left = left
self._right = right
def transform(self, path: PurePath) -> Union[PurePath, bool]:
if match := re.fullmatch(self._left, str(path)):
if isinstance(self._right, bool):
return self._right or path
vars: Dict[str, Union[str, int, float]] = {}
# For some reason, mypy thinks that "groups" has type List[str].
# But since elements of "match.groups()" can be None, mypy is
# wrong.
groups: Sequence[Optional[str]] = [match[0]] + list(match.groups())
locals_dir: Dict[str, Union[str, int, float]] = {}
for i, group in enumerate(groups):
if group is None:
continue
locals_dir[f"g{i}"] = group
vars[f"g{i}"] = group
try:
locals_dir[f"i{i}"] = int(group)
vars[f"i{i}"] = int(group)
except ValueError:
pass
try:
locals_dir[f"f{i}"] = float(group)
vars[f"f{i}"] = float(group)
except ValueError:
pass
result = eval(f"f{right!r}", {}, locals_dir)
return Transformed(PurePath(result))
result = eval(f"f{self._right!r}", vars)
return PurePath(result)
class RenamingParentsTf(Transformation):
def __init__(self, sub_tf: Transformation):
super().__init__(sub_tf.rule)
self.sub_tf = sub_tf
def transform(self, path: PurePath) -> TransformResult:
for i in range(len(path.parts), -1, -1):
parent = PurePath(*path.parts[:i])
child = PurePath(*path.parts[i:])
transformed = self.sub_tf.transform(parent)
if not transformed:
continue
elif isinstance(transformed, Transformed):
return Transformed(transformed.path / child)
elif isinstance(transformed, Ignored):
return transformed
else:
raise RuntimeError(f"Invalid transform result of type {type(transformed)}: {transformed}")
return None
class RenamingPartsTf(Transformation):
def __init__(self, sub_tf: Transformation):
super().__init__(sub_tf.rule)
self.sub_tf = sub_tf
def transform(self, path: PurePath) -> TransformResult:
result = PurePath()
any_part_matched = False
for part in path.parts:
transformed = self.sub_tf.transform(PurePath(part))
if not transformed:
result /= part
elif isinstance(transformed, Transformed):
result /= transformed.path
any_part_matched = True
elif isinstance(transformed, Ignored):
return transformed
else:
raise RuntimeError(f"Invalid transform result of type {type(transformed)}: {transformed}")
if any_part_matched:
return Transformed(result)
else:
return None
return False
class RuleParseError(Exception):
@ -177,15 +162,18 @@ class RuleParseError(Exception):
log.error_contd(f"{spaces}^--- {self.reason}")
T = TypeVar("T")
class Line:
def __init__(self, line: str, line_nr: int):
self._line = line
self._line_nr = line_nr
self._index = 0
def get(self) -> Optional[str]:
if self._index < len(self._line):
return self._line[self._index]
return None
@property
def line(self) -> str:
return self._line
@ -202,196 +190,155 @@ class Line:
def index(self, index: int) -> None:
self._index = index
@property
def rest(self) -> str:
return self.line[self.index:]
def advance(self) -> None:
self._index += 1
def peek(self, amount: int = 1) -> str:
return self.rest[:amount]
def take(self, amount: int = 1) -> str:
string = self.peek(amount)
self.index += len(string)
return string
def expect(self, string: str) -> str:
if self.peek(len(string)) == string:
return self.take(len(string))
def expect(self, string: str) -> None:
for char in string:
if self.get() == char:
self.advance()
else:
raise RuleParseError(self, f"Expected {string!r}")
def expect_with(self, string: str, value: T) -> T:
self.expect(string)
return value
def one_of(self, parsers: List[Callable[[], T]], description: str) -> T:
for parser in parsers:
index = self.index
try:
return parser()
except RuleParseError:
self.index = index
raise RuleParseError(self, description)
# RULE = LEFT SPACE '-' NAME '-' HEAD (SPACE RIGHT)?
# SPACE = ' '+
# NAME = '' | 'exact' | 'name' | 're' | 'exact-re' | 'name-re'
# HEAD = '>' | '>>'
# LEFT = STR | QUOTED_STR
# RIGHT = STR | QUOTED_STR | '!'
def parse_zero_or_more_spaces(line: Line) -> None:
while line.peek() == " ":
line.take()
def parse_one_or_more_spaces(line: Line) -> None:
line.expect(" ")
parse_zero_or_more_spaces(line)
def parse_str(line: Line) -> str:
result = []
while c := line.peek():
if c == " ":
break
else:
line.take()
result.append(c)
if result:
return "".join(result)
else:
raise RuleParseError(line, "Expected non-space character")
raise RuleParseError(self, f"Expected {char!r}")
QUOTATION_MARKS = {'"', "'"}
def parse_quoted_str(line: Line) -> str:
def parse_string_literal(line: Line) -> str:
escaped = False
# Points to first character of string literal
start_index = line.index
quotation_mark = line.peek()
quotation_mark = line.get()
if quotation_mark not in QUOTATION_MARKS:
raise RuleParseError(line, "Expected quotation mark")
line.take()
# This should never happen as long as this function is only called from
# parse_string.
raise RuleParseError(line, "Invalid quotation mark")
line.advance()
while c := line.peek():
while c := line.get():
if escaped:
escaped = False
line.take()
line.advance()
elif c == quotation_mark:
line.take()
line.advance()
stop_index = line.index
literal = line.line[start_index:stop_index]
try:
return ast.literal_eval(literal)
except SyntaxError as e:
line.index = start_index
raise RuleParseError(line, str(e)) from e
elif c == "\\":
escaped = True
line.take()
line.advance()
else:
line.take()
line.advance()
raise RuleParseError(line, "Expected end of string literal")
def parse_left(line: Line) -> str:
if line.peek() in QUOTATION_MARKS:
return parse_quoted_str(line)
else:
return parse_str(line)
def parse_until_space_or_eol(line: Line) -> str:
result = []
while c := line.get():
if c == " ":
break
result.append(c)
line.advance()
return "".join(result)
def parse_right(line: Line) -> Union[str, Ignore]:
c = line.peek()
if c in QUOTATION_MARKS:
return parse_quoted_str(line)
def parse_string(line: Line) -> Union[str, bool]:
if line.get() in QUOTATION_MARKS:
return parse_string_literal(line)
else:
string = parse_str(line)
string = parse_until_space_or_eol(line)
if string == "!":
return Ignore()
return True
return string
def parse_arrow_name(line: Line) -> str:
return line.one_of([
lambda: line.expect("exact-re"),
lambda: line.expect("exact"),
lambda: line.expect("name-re"),
lambda: line.expect("name"),
lambda: line.expect("re"),
lambda: line.expect(""),
], "Expected arrow name")
def parse_arrow(line: Line) -> str:
line.expect("-")
name = []
while True:
c = line.get()
if not c:
raise RuleParseError(line, "Expected rest of arrow")
elif c == "-":
line.advance()
c = line.get()
if not c:
raise RuleParseError(line, "Expected rest of arrow")
elif c == ">":
line.advance()
break # End of arrow
else:
name.append("-")
continue
else:
name.append(c)
line.advance()
return "".join(name)
def parse_arrow_head(line: Line) -> ArrowHead:
return line.one_of([
lambda: line.expect_with(">>", ArrowHead.SEQUENCE),
lambda: line.expect_with(">", ArrowHead.NORMAL),
], "Expected arrow head")
def parse_whitespace(line: Line) -> None:
line.expect(" ")
while line.get() == " ":
line.advance()
def parse_eol(line: Line) -> None:
if line.peek():
if line.get() is not None:
raise RuleParseError(line, "Expected end of line")
def parse_rule(line: Line) -> Rule:
parse_zero_or_more_spaces(line)
left_index = line.index
left = parse_left(line)
# Parse left side
leftindex = line.index
left = parse_string(line)
if isinstance(left, bool):
line.index = leftindex
raise RuleParseError(line, "Left side can't be '!'")
leftpath = PurePath(left)
parse_one_or_more_spaces(line)
# Parse arrow
parse_whitespace(line)
arrowindex = line.index
arrowname = parse_arrow(line)
line.expect("-")
name = parse_arrow_name(line)
line.expect("-")
head = parse_arrow_head(line)
right_index = line.index
right: RightSide
try:
parse_zero_or_more_spaces(line)
parse_eol(line)
right = Empty()
except RuleParseError:
line.index = right_index
parse_one_or_more_spaces(line)
right = parse_right(line)
parse_eol(line)
return Rule(left, left_index, name, head, right, right_index)
def parse_transformation(line: Line) -> Transformation:
rule = parse_rule(line)
if rule.name == "":
return RenamingParentsTf(ExactTf(rule))
elif rule.name == "exact":
return ExactTf(rule)
elif rule.name == "name":
if len(PurePath(rule.left).parts) > 1:
line.index = rule.left_index
raise RuleParseError(line, "Expected name, not multiple segments")
return RenamingPartsTf(ExactTf(rule))
elif rule.name == "re":
return RenamingParentsTf(ExactReTf(rule))
elif rule.name == "exact-re":
return ExactReTf(rule)
elif rule.name == "name-re":
return RenamingPartsTf(ExactReTf(rule))
# Parse right side
if line.get():
parse_whitespace(line)
right = parse_string(line)
else:
raise RuntimeError(f"Invalid arrow name {rule.name!r}")
right = False
rightpath: Union[PurePath, bool]
if isinstance(right, bool):
rightpath = right
else:
rightpath = PurePath(right)
parse_eol(line)
# Dispatch
if arrowname == "":
return NormalRule(leftpath, rightpath)
elif arrowname == "name":
if len(leftpath.parts) > 1:
line.index = leftindex
raise RuleParseError(line, "SOURCE must be a single name, not multiple segments")
return NameRule(ExactRule(leftpath, rightpath))
elif arrowname == "exact":
return ExactRule(leftpath, rightpath)
elif arrowname == "re":
return ReRule(left, right)
elif arrowname == "name-re":
return NameRule(ReRule(left, right))
else:
line.index = arrowindex + 1 # For nicer error message
raise RuleParseError(line, f"Invalid arrow name {arrowname!r}")
class Transformer:
@ -400,40 +347,32 @@ class Transformer:
May throw a RuleParseException.
"""
self._tfs = []
self._rules = []
for i, line in enumerate(rules.split("\n")):
line = line.strip()
if line:
tf = parse_transformation(Line(line, i))
self._tfs.append((line, tf))
rule = parse_rule(Line(line, i))
self._rules.append((line, rule))
def transform(self, path: PurePath) -> Optional[PurePath]:
for i, (line, tf) in enumerate(self._tfs):
for i, (line, rule) in enumerate(self._rules):
log.explain(f"Testing rule {i+1}: {line}")
try:
result = tf.transform(path)
result = rule.transform(path)
except Exception as e:
log.warn(f"Error while testing rule {i+1}: {line}")
log.warn_contd(str(e))
continue
if not result:
continue
if isinstance(result, Ignored):
if isinstance(result, PurePath):
log.explain(f"Match found, transformed path to {fmt_path(result)}")
return result
elif result: # Exclamation mark
log.explain("Match found, path ignored")
return None
if tf.rule.head == ArrowHead.NORMAL:
log.explain(f"Match found, transformed path to {fmt_path(result.path)}")
path = result.path
break
elif tf.rule.head == ArrowHead.SEQUENCE:
log.explain(f"Match found, updated path to {fmt_path(result.path)}")
path = result.path
else:
raise RuntimeError(f"Invalid transform result of type {type(result)}: {result}")
continue
log.explain(f"Final result: {fmt_path(path)}")
log.explain("No rule matched, path is unchanged")
return path

View File

@ -91,14 +91,8 @@ def url_set_query_params(url: str, params: Dict[str, str]) -> str:
return result
def str_path(path: PurePath) -> str:
if not path.parts:
return "."
return "/".join(path.parts)
def fmt_path(path: PurePath) -> str:
return repr(str_path(path))
return repr(str(path))
def fmt_real_path(path: Path) -> str:

View File

@ -1,2 +1,2 @@
NAME = "PFERD"
VERSION = "3.5.0"
VERSION = "3.0.0"

View File

@ -17,7 +17,7 @@ Binaries for Linux, Windows and Mac can be downloaded directly from the
### With pip
Ensure you have at least Python 3.9 installed. Run the following command to
Ensure you have at least Python 3.8 installed. Run the following command to
install PFERD or upgrade it to the latest version:
```
@ -26,22 +26,11 @@ $ pip install --upgrade git+https://github.com/Garmelon/PFERD@latest
The use of [venv](https://docs.python.org/3/library/venv.html) is recommended.
### With package managers
Unofficial packages are available for:
- [AUR](https://aur.archlinux.org/packages/pferd)
- [brew](https://formulae.brew.sh/formula/pferd)
- [conda-forge](https://github.com/conda-forge/pferd-feedstock)
- [nixpkgs](https://github.com/NixOS/nixpkgs/blob/master/pkgs/tools/misc/pferd/default.nix)
- [PyPi](https://pypi.org/project/pferd)
See also PFERD's [repology page](https://repology.org/project/pferd/versions).
## Basic usage
PFERD can be run directly from the command line with no config file. Run `pferd
-h` to get an overview of available commands and options. Run `pferd <command>
-h` to see which options a command has.
PFERD can be run directly from the command line with no config file.
Run `pferd -h` to get an overview of available commands and options.
Run `pferd <command> -h` to see which options a command has.
For example, you can download your personal desktop from the KIT ILIAS like
this:
@ -127,18 +116,17 @@ transform =
Online-Tests --> !
Vorlesungswerbung --> !
# Rename folders
Lehrbücher --> Vorlesung
# Note the ">>" arrow head which lets us apply further rules to files moved to "Übung"
Übungsunterlagen -->> Übung
# Move exercises to own folder. Rename them to "Blatt-XX.pdf" to make them sort properly
"Übung/(\d+). Übungsblatt.pdf" -re-> Blätter/Blatt-{i1:02}.pdf
"Übungsunterlagen/(\d+). Übungsblatt.pdf" -re-> Blätter/Blatt-{i1:02}.pdf
# Move solutions to own folder. Rename them to "Blatt-XX-Lösung.pdf" to make them sort properly
"Übung/(\d+). Übungsblatt.*Musterlösung.pdf" -re-> Blätter/Blatt-{i1:02}-Lösung.pdf
"Übungsunterlagen/(\d+). Übungsblatt.*Musterlösung.pdf" -re-> Blätter/Blatt-{i1:02}-Lösung.pdf
# The course has nested folders with the same name - flatten them
"Übung/(.+?)/\\1" -re-> Übung/{g1}
"Übungsunterlagen/(.+?)/\\1/(.*)" -re-> Übung/{g1}/{g2}
# Rename remaining folders
Übungsunterlagen --> Übung
Lehrbücher --> Vorlesung
[crawl:Bar]
type = kit-ilias-web

27
flake.lock generated
View File

@ -1,27 +0,0 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1694499547,
"narHash": "sha256-R7xMz1Iia6JthWRHDn36s/E248WB1/je62ovC/dUVKI=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "e5f018cf150e29aac26c61dac0790ea023c46b24",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.05",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

View File

@ -1,41 +0,0 @@
{
description = "Tool for downloading course-related files from ILIAS";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05";
};
outputs = { self, nixpkgs }:
let
# Helper function to generate an attrset '{ x86_64-linux = f "x86_64-linux"; ... }'.
forAllSystems = nixpkgs.lib.genAttrs nixpkgs.lib.systems.flakeExposed;
in
{
packages = forAllSystems (system:
let pkgs = import nixpkgs { inherit system; };
in
rec {
default = pkgs.python3Packages.buildPythonApplication rec {
pname = "pferd";
# Performing black magic
# Don't worry, I sacrificed enough goats for the next few years
version = (pkgs.lib.importTOML ./PFERD/version.py).VERSION;
format = "pyproject";
src = ./.;
nativeBuildInputs = with pkgs.python3Packages; [
setuptools
];
propagatedBuildInputs = with pkgs.python3Packages; [
aiohttp
beautifulsoup4
rich
keyring
certifi
];
};
});
};
}

View File

@ -12,6 +12,6 @@ pip install --upgrade setuptools
# Installing PFERD itself
pip install --editable .
# Installing tools and type hints
pip install --upgrade mypy flake8 autopep8 isort pyinstaller
pip install --upgrade types-chardet types-certifi
# Installing various tools
pip install --upgrade mypy flake8 autopep8 isort
pip install --upgrade pyinstaller

View File

@ -4,13 +4,13 @@ version = attr: PFERD.version.VERSION
[options]
packages = find:
python_requires = >=3.9
python_requires = >=3.8
install_requires =
aiohttp>=3.8.1
beautifulsoup4>=4.10.0
rich>=11.0.0
keyring>=23.5.0
certifi>=2021.10.8
aiohttp>=3.7.4.post0
beautifulsoup4>=4.9.3
rich>=10.1.0
keyring>=23.0.1
certifi>=2020.12.5
[options.entry_points]
console_scripts =