Compare commits

..

77 Commits

Author SHA1 Message Date
a241672726 Bump version to 3.4.0 2022-05-01 22:29:06 +02:00
a8f76e9be7 Use utf-8 for credential file 2022-04-29 23:15:12 +02:00
b56475450d Use utf-8 for cookies 2022-04-29 23:12:41 +02:00
aa74604d29 Use utf-8 for report 2022-04-29 23:11:27 +02:00
d2e6d91880 Make PFERD executable via python -m 2022-04-27 22:52:50 +02:00
602044ff1b Fix mypy errors and add missing await 2022-04-27 22:52:50 +02:00
31631fb409 Increase minimum python version to 3.9 2022-04-27 22:52:50 +02:00
00db348218 Update changelog 2022-04-27 22:03:52 +02:00
a709280cbf Try to detect unsupported config file encoding
The encoding detection is quite rudimentary, but should detect the
default windows encoding in many cases.
2022-04-27 22:03:47 +02:00
a99ddaa0cc Read and write config in UTF-8 2022-04-27 21:47:51 +02:00
ba3d299c05 Fix changelog 2022-04-27 21:26:24 +02:00
07a21f80a6 Link to unofficial packages 2022-04-27 21:15:33 +02:00
f17b9b68f4 Add shibboleth authentication fix to changelog 2022-04-27 14:01:40 +02:00
a2831fbea2 Fix shib authentication
Authentication failed previously if the shib session was still valid.
If Shibboleth gets a request and the session is still valid, it directly
responds without a second redirect.
2022-04-27 13:55:24 +02:00
da72863b47 Placate newer mypy 2022-04-03 13:19:08 +02:00
86e2e226dc Notify user when shibboleth presents new entitlements 2022-04-03 11:37:08 +02:00
7872fe5221 Fix tables with more columns than expected 2022-01-18 22:38:48 +01:00
86947e4874 Bump version to 3.3.1 2022-01-15 15:11:22 +01:00
4f022e2d19 Reword changelog 2022-01-15 15:06:02 +01:00
f47e7374d2 Use fixed windows path for video cache 2022-01-15 12:00:30 +01:00
57ec51e95a Fix login after shib url parser change 2022-01-14 20:17:27 +01:00
0045124a4e Bump version to 3.3.0 2022-01-09 21:09:09 +01:00
9618aae83b Add content pages to changelog 2022-01-09 18:32:58 +01:00
33453ede2d Update dependency versions in setup.py 2022-01-09 18:31:42 +01:00
e467b38d73 Only reject 1970 timestamps on windows 2022-01-09 18:23:00 +01:00
e9d2d05030 Update changelog 2022-01-09 11:48:26 +01:00
4bf0c972e6 Update types for rich 11 2022-01-09 11:48:26 +01:00
4ee919625d Add rudimentary support for content pages 2022-01-08 20:47:35 +01:00
d30f25ee97 Detect shib login page as login page
And do not assume we are logged in...
2022-01-08 20:28:45 +01:00
10d9d74528 Bail out when crawling recursive courses 2022-01-08 20:28:30 +01:00
43c5453e10 Correctly crawl files on desktop
The files on the desktop do not include a download link, so we need to
rewrite it.
2022-01-08 20:00:53 +01:00
eb4de8ae0c Ignore 1970 dates as windows crashes when calling .timestamp() 2022-01-08 18:14:43 +01:00
e32c1f000f Fix mtime for single streams 2022-01-08 18:05:48 +01:00
5f527bc697 Remove Python 3.9 Pattern typehints 2022-01-08 17:14:40 +01:00
ced8b9a2d0 Fix some accordions 2022-01-08 16:58:30 +01:00
6f3cfd4396 Fix personal desktop crawling 2022-01-08 16:58:15 +01:00
462d993fbc Fix local video path cache (hopefully) 2022-01-08 00:27:48 +01:00
a99356f2a2 Fix video stream extraction 2022-01-08 00:27:34 +01:00
eac2e34161 Fix is_logged_in for ILIAS 7 2022-01-07 23:32:31 +01:00
a82a0b19c2 Collect crawler warnings/errors and include them in the report 2021-11-07 21:48:55 +01:00
90cb6e989b Do not download single videos if cache does not exist 2021-11-06 23:21:15 +01:00
6289938d7c Do not stop crawling files when encountering a CrawlWarning 2021-11-06 12:09:51 +01:00
13b8c3d9c6 Add regex option to config and CLI parser 2021-11-02 09:30:46 +01:00
88afe64a92 Refactor IPD crawler a bit 2021-11-02 01:25:01 +00:00
6b2a657573 Fix IPD crawler for different subpages (#42)
This patch reworks the IPD crawler to support subpages which do not use
"/intern" for links and fetches the folder names from table headings.
2021-11-02 01:25:01 +00:00
d6f38a61e1 Fixed minor spelling mistakes 2021-11-02 01:54:00 +01:00
ad3f4955f7 Update changelog 2021-10-30 18:14:39 +02:00
e42ab83d32 Add support for ILIAS cards 2021-10-30 18:13:44 +02:00
f9a3f9b9f2 Handle multi-stream videos 2021-10-30 18:12:29 +02:00
ef7d5ea2d3 Allow storing crawler-specific data in reports 2021-10-30 18:09:05 +02:00
55ea304ff3 Disable interpolation of ConfigParser 2021-10-25 23:37:42 +02:00
fee12b3d9e Fix changelog 2021-10-25 17:44:12 +00:00
6673077397 Add kit-ipd crawler 2021-10-21 13:20:21 +02:00
742632ed8d Bump version to 3.2.0 2021-08-04 18:27:26 +00:00
544d45cbc5 Catch non-critical exceptions at crawler top level 2021-07-13 15:42:11 +02:00
86f79ff1f1 Update changelog 2021-07-07 15:23:58 +02:00
ee67f9f472 Sort elements by ILIAS id to ensure deterministic ordering 2021-07-06 17:45:48 +02:00
8ec3f41251 Crawl ilias booking objects as links 2021-07-06 16:15:25 +02:00
89be07d4d3 Use final crawl path in HTML parsing message 2021-07-03 17:05:48 +02:00
91200f3684 Fix nondeterministic name deduplication 2021-07-03 12:09:55 +02:00
9ffd603357 Error when using multiple segments with -name->
Previously, PFERD just silently never matched the -name-> arrow. Now, it errors
when loading the config file.
2021-07-01 11:14:50 +02:00
80eeb8fe97 Add --skip option 2021-07-01 11:02:21 +02:00
75fde870c2 Bump version to 3.1.0 2021-06-13 17:23:18 +02:00
6e4d423c81 Crawl all video stages in one crawl bar
This ensures folders are not renamed, as they are crawled twice
2021-06-13 17:18:45 +02:00
57aef26217 Fix name arrows
I seem to have (re-)implemented them incorrectly and never tested them.
2021-06-13 16:33:29 +02:00
70ec64a48b Fix wrong base URL for multi-stage pages 2021-06-13 15:44:47 +02:00
70b33ecfd9 Add migration notes to changelog
Also clean up some other formatting for consistency
2021-06-13 15:06:50 +02:00
601e4b936b Use new arrow logic in README example config 2021-06-12 15:00:52 +02:00
a292c4c437 Add example for ">>" arrow heads 2021-06-12 14:57:29 +02:00
bc65ea7ab6 Fix mypy complaining about missing type hints 2021-06-09 22:45:52 +02:00
f28bbe6b0c Update transform rule documentation
It's still missing an example that uses rules with ">>" arrows.
2021-06-09 22:45:52 +02:00
61d902d715 Overhaul transform logic
-re-> arrows now rename their parent directories (like -->) and don't require a
full match (like -exact->). Their old behaviour is available as -exact-re->.

Also, this change adds the ">>" arrow head, which modifies the current path and
continues to the next rule when it matches.
2021-06-09 22:45:52 +02:00
8ab462fb87 Use the exercise label instead of the button name as path 2021-06-04 19:24:23 +02:00
df3ad3d890 Add 'skip' option to crawlers 2021-06-04 18:47:13 +02:00
fc31100a0f Always use '/' as path separator for regex rules
Previously, regex-matching paths on windows would, in some cases, require four
backslashes ('\\\\') to escape a single path separator. That's just too much.

With this commit, regex transforms now use '/' instead of '\' as path separator,
meaning rules can more easily be shared between platforms (although they are not
guaranteed to be 100% compatible since on Windows, '\' is still recognized as a
path separator).

To make rules more intuitive to write, local relative paths are now also printed
with '/' as path separator on Windows. Since Windows also accepts '/' as path
separator, this change doesn't really affect other rules that parse their sides
as paths.
2021-06-04 18:12:45 +02:00
31b6311e99 Remove incorrect tmp file explain message 2021-06-01 19:03:06 +02:00
1fc8e9eb7a Document credential file authenticator config options 2021-06-01 10:01:14 +00:00
28 changed files with 1495 additions and 456 deletions

View File

@ -14,7 +14,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python: ["3.8"]
python: ["3.9"]
steps:
- uses: actions/checkout@v2

View File

@ -22,6 +22,80 @@ ambiguous situations.
## Unreleased
## 3.4.0 - 2022-05-01
### Added
- Message when Shibboleth entitlements need to be manually reviewed
- Links to unofficial packages and repology in the readme
### Changed
- Increase minimum supported Python version to 3.9
- Support video listings with more columns
- Use UTF-8 when reading/writing the config file
### Fixed
- Crash during authentication when the Shibboleth session is still valid
## 3.3.1 - 2022-01-15
### Fixed
- ILIAS login
- Local video cache if `windows_paths` is enabled
## 3.3.0 - 2022-01-09
### Added
- A KIT IPD crawler
- Support for ILIAS cards
- (Rudimentary) support for content pages
- Support for multi-stream videos
- Support for ILIAS 7
### Removed
- [Interpolation](https://docs.python.org/3/library/configparser.html#interpolation-of-values) in config file
### Fixed
- Crawling of recursive courses
- Crawling files directly placed on the personal desktop
- Ignore timestamps at the unix epoch as they crash on windows
## 3.2.0 - 2021-08-04
### Added
- `--skip` command line option
- Support for ILIAS booking objects
### Changed
- Using multiple path segments on left side of `-name->` now results in an
error. This was already forbidden by the documentation but silently accepted
by PFERD.
- More consistent path printing in some `--explain` messages
### Fixed
- Nondeterministic name deduplication due to ILIAS reordering elements
- More exceptions are handled properly
## 3.1.0 - 2021-06-13
If your config file doesn't do weird things with transforms, it should continue
to work. If your `-re->` arrows behave weirdly, try replacing them with
`-exact-re->` arrows. If you're on Windows, you might need to switch from `\`
path separators to `/` in your regex rules.
### Added
- `skip` option for crawlers
- Rules with `>>` instead of `>` as arrow head
- `-exact-re->` arrow (behaves like `-re->` did previously)
### Changed
- The `-re->` arrow can now rename directories (like `-->`)
- Use `/` instead of `\` as path separator for (regex) rules on Windows
- Use the label to the left for exercises instead of the button name to
determine the folder name
### Fixed
- Video pagination handling in ILIAS crawler
## 3.0.1 - 2021-06-01
### Added

181
CONFIG.md
View File

@ -4,11 +4,11 @@ A config file consists of sections. A section begins with a `[section]` header,
which is followed by a list of `key = value` pairs. Comments must be on their
own line and start with `#`. Multiline values must be indented beyond their key.
Boolean values can be `yes` or `no`. For more details and some examples on the
format, see the [configparser documentation][1] ([basic interpolation][2] is
enabled).
format, see the [configparser documentation][1] ([interpolation][2] is
disabled).
[1]: <https://docs.python.org/3/library/configparser.html#supported-ini-file-structure> "Supported INI File Structure"
[2]: <https://docs.python.org/3/library/configparser.html#configparser.BasicInterpolation> "BasicInterpolation"
[2]: <https://docs.python.org/3/library/configparser.html#interpolation-of-values> "Interpolation of values"
## The `DEFAULT` section
@ -36,7 +36,7 @@ Sections whose names start with `crawl:` are used to configure crawlers. The
rest of the section name specifies the name of the crawler.
A crawler synchronizes a remote resource to a local directory. There are
different types of crawlers for different kinds of resources, e. g. ILIAS
different types of crawlers for different kinds of resources, e.g. ILIAS
courses or lecture websites.
Each crawl section represents an instance of a specific type of crawler. The
@ -49,8 +49,11 @@ see the type's [documentation](#crawler-types) below. The following options are
common to all crawlers:
- `type`: The available types are specified in [this section](#crawler-types).
- `skip`: Whether the crawler should be skipped during normal execution. The
crawler can still be executed manually using the `--crawler` or `-C` flags.
(Default: `no`)
- `output_dir`: The directory the crawler synchronizes files to. A crawler will
never place any files outside of this directory. (Default: the crawler's name)
never place any files outside this directory. (Default: the crawler's name)
- `redownload`: When to download a file that is already present locally.
(Default: `never-smart`)
- `never`: If a file is present locally, it is not downloaded again.
@ -133,6 +136,18 @@ crawler simulate a slower, network-based crawler.
requests. (Default: `0.0`)
- `download_speed`: Download speed (in bytes per second) to simulate. (Optional)
### The `kit-ipd` crawler
This crawler crawls a KIT-IPD page by url. The root page can be crawled from
outside the KIT network so you will be informed about any new/deleted files,
but downloading files requires you to be within. Adding a show delay between
requests is likely a good idea.
- `target`: URL to a KIT-IPD page
- `link_regex`: A regex that is matched against the `href` part of links. If it
matches, the given link is downloaded as a file. This is used to extract
files from KIT-IPD pages. (Default: `^.*/[^/]*\.(?:pdf|zip|c|java)$`)
### The `kit-ilias-web` crawler
This crawler crawls the KIT ILIAS instance.
@ -182,8 +197,11 @@ via the terminal.
### The `credential-file` authenticator
This authenticator reads a username and a password from a credential file. The
credential file has exactly two lines (trailing newline optional). The first
This authenticator reads a username and a password from a credential file.
- `path`: Path to the credential file. (Required)
The credential file has exactly two lines (trailing newline optional). The first
line starts with `username=` and contains the username, the second line starts
with `password=` and contains the password. The username and password may
contain any characters except a line break.
@ -216,63 +234,94 @@ This authenticator does not support usernames.
Transformation rules are rules for renaming and excluding files and directories.
They are specified line-by-line in a crawler's `transform` option. When a
crawler needs to apply a rule to a path, it goes through this list top-to-bottom
and choose the first matching rule.
and applies the first matching rule.
To see this process in action, you can use the `--debug-transforms` or flag or
the `--explain` flag.
Each line has the format `SOURCE ARROW TARGET` where `TARGET` is optional.
`SOURCE` is either a normal path without spaces (e. g. `foo/bar`), or a string
literal delimited by `"` or `'` (e. g. `"foo\" bar/baz"`). Python's string
escape syntax is supported. Trailing slashes are ignored. `TARGET` can be
formatted like `SOURCE`, but it can also be a single exclamation mark without
quotes (`!`). `ARROW` is one of `-->`, `-name->`, `-exact->`, `-re->` and
`-name-re->`
Each rule has the format `SOURCE ARROW TARGET` (e. g. `foo/bar --> foo/baz`).
The arrow specifies how the source and target are interpreted. The different
kinds of arrows are documented below.
If a rule's target is `!`, this means that when the rule matches on a path, the
corresponding file or directory is ignored. If a rule's target is missing, the
path is matched but not modified.
`SOURCE` and `TARGET` are either a bunch of characters without spaces (e. g.
`foo/bar`) or string literals (e. g, `"foo/b a r"`). The former syntax has no
concept of escaping characters, so the backslash is just another character. The
string literals however support Python's escape syntax (e. g.
`"foo\\bar\tbaz"`). This also means that in string literals, backslashes must be
escaped.
`TARGET` can additionally be a single exclamation mark `!` (*not* `"!"`). When a
rule with a `!` as target matches a path, the corresponding file or directory is
ignored by the crawler instead of renamed.
`TARGET` can also be omitted entirely. When a rule without target matches a
path, the path is returned unmodified. This is useful to prevent rules further
down from matching instead.
Each arrow's behaviour can be modified slightly by changing the arrow's head
from `>` to `>>`. When a rule with a `>>` arrow head matches a path, it doesn't
return immediately like a normal arrow. Instead, it replaces the current path
with its output and continues on to the next rule. In effect, this means that
multiple rules can be applied sequentially.
### The `-->` arrow
The `-->` arrow is a basic renaming operation. If a path begins with `SOURCE`,
that part of the path is replaced with `TARGET`. This means that the rule
`foo/bar --> baz` would convert `foo/bar` into `baz`, but also `foo/bar/xyz`
into `baz/xyz`. The rule `foo --> !` would ignore a directory named `foo` as
well as all its contents.
The `-->` arrow is a basic renaming operation for files and directories. If a
path matches `SOURCE`, it is renamed to `TARGET`.
Example: `foo/bar --> baz`
- Doesn't match `foo`, `a/foo/bar` or `foo/baz`
- Converts `foo/bar` into `baz`
- Converts `foo/bar/wargl` into `bar/wargl`
Example: `foo/bar --> !`
- Doesn't match `foo`, `a/foo/bar` or `foo/baz`
- Ignores `foo/bar` and any of its children
### The `-name->` arrow
The `-name->` arrow lets you rename files and directories by their name,
regardless of where they appear in the file tree. Because of this, its `SOURCE`
must not contain multiple path segments, only a single name. This restriction
does not apply to its `TARGET`. The `-name->` arrow is not applied recursively
to its own output to prevent infinite loops.
does not apply to its `TARGET`.
For example, the rule `foo -name-> bar/baz` would convert `a/foo` into
`a/bar/baz` and `a/foo/b/c/foo` into `a/bar/baz/b/c/bar/baz`. The rule `foo
-name-> !` would ignore all directories and files named `foo`.
Example: `foo -name-> bar/baz`
- Doesn't match `a/foobar/b` or `x/Foo/y/z`
- Converts `hello/foo` into `hello/bar/baz`
- Converts `foo/world` into `bar/baz/world`
- Converts `a/foo/b/c/foo` into `a/bar/baz/b/c/bar/baz`
Example: `foo -name-> !`
- Doesn't match `a/foobar/b` or `x/Foo/y/z`
- Ignores any path containing a segment `foo`
### The `-exact->` arrow
The `-exact->` arrow requires the path to match `SOURCE` exactly. This means
that the rule `foo/bar -exact-> baz` would still convert `foo/bar` into `baz`,
but `foo/bar/xyz` would be unaffected. Also, `foo -exact-> !` would only ignore
`foo`, but not its contents (if it has any). The examples below show why this is
useful.
The `-exact->` arrow requires the path to match `SOURCE` exactly. The examples
below show why this is useful.
Example: `foo/bar -exact-> baz`
- Doesn't match `foo`, `a/foo/bar` or `foo/baz`
- Converts `foo/bar` into `baz`
- Doesn't match `foo/bar/wargl`
Example: `foo/bar -exact-> !`
- Doesn't match `foo`, `a/foo/bar` or `foo/baz`
- Ignores only `foo/bar`, not its children
### The `-re->` arrow
The `-re->` arrow uses regular expressions. `SOURCE` is a regular expression
that must match the entire path. If this is the case, then the capturing groups
are available in `TARGET` for formatting.
The `-re->` arrow is like the `-->` arrow but with regular expressions. `SOURCE`
is a regular expression and `TARGET` an f-string based template. If a path
matches `SOURCE`, the output path is created using `TARGET` as template.
`SOURCE` is automatically anchored.
`TARGET` uses Python's [format string syntax][3]. The *n*-th capturing group can
be referred to as `{g<n>}` (e. g. `{g3}`). `{g0}` refers to the original path.
be referred to as `{g<n>}` (e.g. `{g3}`). `{g0}` refers to the original path.
If capturing group *n*'s contents are a valid integer, the integer value is
available as `{i<n>}` (e. g. `{i3}`). If capturing group *n*'s contents are a
valid float, the float value is available as `{f<n>}` (e. g. `{f3}`). If a
capturing group is not present (e. g. when matching the string `cd` with the
available as `{i<n>}` (e.g. `{i3}`). If capturing group *n*'s contents are a
valid float, the float value is available as `{f<n>}` (e.g. `{f3}`). If a
capturing group is not present (e.g. when matching the string `cd` with the
regex `(ab)?cd`), the corresponding variables are not defined.
Python's format string syntax has rich options for formatting its arguments. For
@ -282,18 +331,37 @@ can use `{i3:05}`.
PFERD even allows you to write entire expressions inside the curly braces, for
example `{g2.lower()}` or `{g3.replace(' ', '_')}`.
Example: `f(oo+)/be?ar -re-> B{g1.upper()}H/fear`
- Doesn't match `a/foo/bar`, `foo/abc/bar`, `afoo/bar` or `foo/bars`
- Converts `foo/bar` into `BOOH/fear`
- Converts `fooooo/bear` into `BOOOOOH/fear`
- Converts `foo/bar/baz` into `BOOH/fear/baz`
[3]: <https://docs.python.org/3/library/string.html#format-string-syntax> "Format String Syntax"
### The `-name-re->` arrow
The `-name-re>` arrow is like a combination of the `-name->` and `-re->` arrows.
Instead of the `SOURCE` being the name of a directory or file, it's a regex that
is matched against the names of directories and files. `TARGET` works like the
`-re->` arrow's target.
For example, the arrow `(.*)\.jpeg -name-re-> {g1}.jpg` will rename all `.jpeg`
extensions into `.jpg`. The arrow `\..+ -name-re-> !` will ignore all files and
directories starting with `.`.
Example: `(.*)\.jpeg -name-re-> {g1}.jpg`
- Doesn't match `foo/bar.png`, `baz.JPEG` or `hello,jpeg`
- Converts `foo/bar.jpeg` into `foo/bar.jpg`
- Converts `foo.jpeg/bar/baz.jpeg` into `foo.jpg/bar/baz.jpg`
Example: `\..+ -name-re-> !`
- Doesn't match `.`, `test`, `a.b`
- Ignores all files and directories starting with `.`.
### The `-exact-re->` arrow
The `-exact-re>` arrow is like a combination of the `-exact->` and `-re->`
arrows.
Example: `f(oo+)/be?ar -exactre-> B{g1.upper()}H/fear`
- Doesn't match `a/foo/bar`, `foo/abc/bar`, `afoo/bar` or `foo/bars`
- Converts `foo/bar` into `BOOH/fear`
- Converts `fooooo/bear` into `BOOOOOH/fear`
- Doesn't match `foo/bar/baz`
### Example: Tutorials
@ -320,8 +388,7 @@ tutorials --> !
The second rule is required for many crawlers since they use the rules to decide
which directories to crawl. If it was missing when the crawler looks at
`tutorials/`, the third rule would match. This means the crawler would not crawl
the `tutorials/` directory and thus not discover that `tutorials/tut02/`
existed.
the `tutorials/` directory and thus not discover that `tutorials/tut02/` exists.
Since the second rule is only relevant for crawling, the `TARGET` is left out.
@ -346,9 +413,9 @@ To do this, you can use the most powerful of arrows: The regex arrow.
Note the escaped backslashes on the `SOURCE` side.
### Example: Crawl a python project
### Example: Crawl a Python project
You are crawling a python project and want to ignore all hidden files (files
You are crawling a Python project and want to ignore all hidden files (files
whose name starts with a `.`), all `__pycache__` directories and all markdown
files (for some weird reason).
@ -368,11 +435,21 @@ README.md
...
```
For this task, the name arrows can be used. They are variants of the normal
arrows that only look at the file name instead of the entire path.
For this task, the name arrows can be used.
```
\..* -name-re-> !
__pycache__ -name-> !
.*\.md -name-re-> !
```
### Example: Clean up names
You want to convert all paths into lowercase and replace spaces with underscores
before applying any rules. This can be achieved using the `>>` arrow heads.
```
(.*) -re->> "{g1.lower().replace(' ', '_')}"
<other rules go here>
```

View File

@ -1,4 +1,5 @@
Copyright 2019-2020 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, TheChristophe, Scriptim
Copyright 2019-2021 Garmelon, I-Al-Istannen, danstooamerican, pavelzw,
TheChristophe, Scriptim, thelukasprobst, Toorero
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in

View File

@ -15,7 +15,7 @@ from .transformer import RuleParseError
def load_config_parser(args: argparse.Namespace) -> configparser.ConfigParser:
log.explain_topic("Loading config")
parser = configparser.ConfigParser()
parser = configparser.ConfigParser(interpolation=None)
if args.command is None:
log.explain("No CLI command specified, loading config from file")
@ -116,7 +116,7 @@ def main() -> None:
sys.exit()
try:
pferd = Pferd(config, args.crawler)
pferd = Pferd(config, args.crawler, args.skip)
except PferdLoadError as e:
log.unlock()
log.error(str(e))
@ -147,7 +147,6 @@ def main() -> None:
log.unlock()
log.explain_topic("Interrupted, exiting immediately")
log.explain("Open files and connections are left for the OS to clean up")
log.explain("Temporary files are not cleaned up")
pferd.print_report()
# TODO Clean up tmp files
# And when those files *do* actually get cleaned up properly,
@ -160,3 +159,7 @@ def main() -> None:
sys.exit(1)
else:
pferd.print_report()
if __name__ == "__main__":
main()

View File

@ -13,7 +13,11 @@ class AuthError(Exception):
class AuthSection(Section):
pass
def type(self) -> str:
value = self.s.get("type")
if value is None:
self.missing_value("type")
return value
class Authenticator(ABC):

View File

@ -20,8 +20,10 @@ class CredentialFileAuthenticator(Authenticator):
path = config.default_section.working_dir() / section.path()
try:
with open(path) as f:
with open(path, encoding="utf-8") as f:
lines = list(f)
except UnicodeDecodeError:
raise AuthLoadError(f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8")
except OSError as e:
raise AuthLoadError(f"No credential file at {fmt_real_path(path)}") from e

View File

@ -9,4 +9,5 @@
from . import command_local # noqa: F401 imported but unused
from . import command_kit_ilias_web # noqa: F401 imported but unused
from . import command_kit_ipd # noqa: F401 imported but unused
from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused

View File

@ -0,0 +1,54 @@
import argparse
import configparser
from pathlib import Path
from ..logging import log
from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler
SUBPARSER = SUBPARSERS.add_parser(
"kit-ipd",
parents=[CRAWLER_PARSER],
)
GROUP = SUBPARSER.add_argument_group(
title="kit ipd crawler arguments",
description="arguments for the 'kit-ipd' crawler",
)
GROUP.add_argument(
"--link-regex",
type=str,
metavar="REGEX",
help="href-matching regex to identify downloadable files"
)
GROUP.add_argument(
"target",
type=str,
metavar="TARGET",
help="url to crawl"
)
GROUP.add_argument(
"output",
type=Path,
metavar="OUTPUT",
help="output directory"
)
def load(
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
log.explain("Creating config for command 'kit-ipd'")
parser["crawl:kit-ipd"] = {}
section = parser["crawl:kit-ipd"]
load_crawler(args, section)
section["type"] = "kit-ipd"
section["target"] = str(args.target)
section["output_dir"] = str(args.output)
if args.link_regex:
section["link_regex"] = str(args.link_regex)
SUBPARSER.set_defaults(command=load)

View File

@ -181,6 +181,14 @@ PARSER.add_argument(
help="only execute a single crawler."
" Can be specified multiple times to execute multiple crawlers"
)
PARSER.add_argument(
"--skip", "-S",
action="append",
type=str,
metavar="NAME",
help="don't execute this particular crawler."
" Can be specified multiple times to skip multiple crawlers"
)
PARSER.add_argument(
"--working-dir",
type=Path,

View File

@ -120,7 +120,7 @@ class Config:
# Using config.read_file instead of config.read because config.read
# would just ignore a missing file and carry on.
try:
with open(path) as f:
with open(path, encoding="utf-8") as f:
parser.read_file(f, source=str(path))
except FileNotFoundError:
raise ConfigLoadError(path, "File does not exist")
@ -128,6 +128,8 @@ class Config:
raise ConfigLoadError(path, "That's a directory, not a file")
except PermissionError:
raise ConfigLoadError(path, "Insufficient permissions")
except UnicodeDecodeError:
raise ConfigLoadError(path, "File is not encoded using UTF-8")
def dump(self, path: Optional[Path] = None) -> None:
"""
@ -154,12 +156,12 @@ class Config:
try:
# x = open for exclusive creation, failing if the file already
# exists
with open(path, "x") as f:
with open(path, "x", encoding="utf-8") as f:
self._parser.write(f)
except FileExistsError:
print("That file already exists.")
if asyncio.run(prompt_yes_no("Overwrite it?", default=False)):
with open(path, "w") as f:
with open(path, "w", encoding="utf-8") as f:
self._parser.write(f)
else:
raise ConfigDumpError(path, "File already exists")

View File

@ -3,8 +3,9 @@ from typing import Callable, Dict
from ..auth import Authenticator
from ..config import Config
from .crawler import Crawler, CrawlError # noqa: F401
from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401
from .ilias import KitIliasWebCrawler, KitIliasWebCrawlerSection
from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection
from .local_crawler import LocalCrawler, LocalCrawlerSection
CrawlerConstructor = Callable[[
@ -19,4 +20,6 @@ CRAWLERS: Dict[str, CrawlerConstructor] = {
LocalCrawler(n, LocalCrawlerSection(s), c),
"kit-ilias-web": lambda n, s, c, a:
KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a),
"kit-ipd": lambda n, s, c, a:
KitIpdCrawler(n, KitIpdCrawlerSection(s), c),
}

View File

@ -1,9 +1,10 @@
import asyncio
import os
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Coroutine
from datetime import datetime
from pathlib import Path, PurePath
from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence, Set, Tuple, TypeVar
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, TypeVar
from ..auth import Authenticator
from ..config import Config, Section
@ -47,16 +48,18 @@ def noncritical(f: Wrapped) -> Wrapped:
try:
f(*args, **kwargs)
except (CrawlWarning, OutputDirError, MarkDuplicateError, MarkConflictError) as e:
crawler.report.add_warning(str(e))
log.warn(str(e))
crawler.error_free = False
except: # noqa: E722 do not use bare 'except'
except Exception as e:
crawler.error_free = False
crawler.report.add_error(str(e))
raise
return wrapper # type: ignore
AWrapped = TypeVar("AWrapped", bound=Callable[..., Awaitable[None]])
AWrapped = TypeVar("AWrapped", bound=Callable[..., Coroutine[Any, Any, Optional[Any]]])
def anoncritical(f: AWrapped) -> AWrapped:
@ -72,21 +75,25 @@ def anoncritical(f: AWrapped) -> AWrapped:
Warning: Must only be applied to member functions of the Crawler class!
"""
async def wrapper(*args: Any, **kwargs: Any) -> None:
async def wrapper(*args: Any, **kwargs: Any) -> Optional[Any]:
if not (args and isinstance(args[0], Crawler)):
raise RuntimeError("@anoncritical must only applied to Crawler methods")
crawler = args[0]
try:
await f(*args, **kwargs)
return await f(*args, **kwargs)
except (CrawlWarning, OutputDirError, MarkDuplicateError, MarkConflictError) as e:
log.warn(str(e))
crawler.error_free = False
except: # noqa: E722 do not use bare 'except'
crawler.report.add_warning(str(e))
except Exception as e:
crawler.error_free = False
crawler.report.add_error(str(e))
raise
return None
return wrapper # type: ignore
@ -132,6 +139,15 @@ class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
class CrawlerSection(Section):
def type(self) -> str:
value = self.s.get("type")
if value is None:
self.missing_value("type")
return value
def skip(self) -> bool:
return self.s.getboolean("skip", fallback=False)
def output_dir(self, name: str) -> Path:
# TODO Use removeprefix() after switching to 3.9
if name.startswith("crawl:"):
@ -309,6 +325,7 @@ class Crawler(ABC):
log.explain("Warnings or errors occurred during this run")
log.explain("Answer: No")
@anoncritical
async def run(self) -> None:
"""
Start the crawling process. Call this function if you want to use a

View File

@ -108,7 +108,7 @@ class HttpCrawler(Crawler):
def _load_cookies_from_file(self, path: Path) -> None:
jar: Any = http.cookies.SimpleCookie()
with open(path) as f:
with open(path, encoding="utf-8") as f:
for i, line in enumerate(f):
# Names of headers are case insensitive
if line[:11].lower() == "set-cookie:":
@ -121,7 +121,7 @@ class HttpCrawler(Crawler):
jar: Any = http.cookies.SimpleCookie()
for morsel in self._cookie_jar:
jar[morsel.key] = morsel
with open(path, "w") as f:
with open(path, "w", encoding="utf-8") as f:
f.write(jar.output(sep="\n"))
f.write("\n") # A trailing newline is just common courtesy

View File

@ -22,6 +22,7 @@ class IliasElementType(Enum):
FOLDER = "folder"
FORUM = "forum"
LINK = "link"
BOOKING = "booking"
MEETING = "meeting"
VIDEO = "video"
VIDEO_PLAYER = "video_player"
@ -37,6 +38,22 @@ class IliasPageElement:
mtime: Optional[datetime] = None
description: Optional[str] = None
def id(self) -> str:
regexes = [
r"eid=(?P<id>[0-9a-z\-]+)",
r"file_(?P<id>\d+)",
r"ref_id=(?P<id>\d+)",
r"target=[a-z]+_(?P<id>\d+)"
]
for regex in regexes:
if match := re.search(regex, self.url):
return match.groupdict()["id"]
# Fall back to URL
log.warn(f"Didn't find identity for {self.name} - {self.url}. Please report this.")
return self.url
class IliasPage:
@ -59,12 +76,20 @@ class IliasPage:
if self._is_exercise_file():
log.explain("Page is an exercise, searching for elements")
return self._find_exercise_entries()
if self._is_personal_desktop():
log.explain("Page is the personal desktop, searching for elements")
return self._find_personal_desktop_entries()
if self._is_content_page():
log.explain("Page is a content page, searching for elements")
return self._find_copa_entries()
log.explain("Page is a normal folder, searching for elements")
return self._find_normal_entries()
def get_next_stage_url(self) -> Optional[str]:
def get_next_stage_element(self) -> Optional[IliasPageElement]:
if self._is_ilias_opencast_embedding():
return self.get_child_elements()[0].url
return self.get_child_elements()[0]
if self._page_type == IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED:
return self._find_video_entries_paginated()[0]
return None
def _is_video_player(self) -> bool:
@ -101,13 +126,22 @@ class IliasPage:
return False
def _is_personal_desktop(self) -> bool:
return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x})
def _is_content_page(self) -> bool:
link = self._soup.find(id="current_perma_link")
if not link:
return False
return "target=copa_" in link.get("value")
def _player_to_video(self) -> List[IliasPageElement]:
# Fetch the actual video page. This is a small wrapper page initializing a javscript
# player. Sadly we can not execute that JS. The actual video stream url is nowhere
# on the page, but defined in a JS object inside a script tag, passed to the player
# library.
# We do the impossible and RegEx the stream JSON object out of the page's HTML source
regex: re.Pattern[str] = re.compile(
regex = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
)
json_match = regex.search(str(self._soup))
@ -119,9 +153,63 @@ class IliasPage:
# parse it
json_object = json.loads(json_str)
# and fetch the video url!
video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"]
return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)]
streams = [stream for stream in json_object["streams"]]
# and just fetch the lone video url!
if len(streams) == 1:
video_url = streams[0]["sources"]["mp4"][0]["src"]
return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)]
log.explain(f"Found multiple videos for stream at {self._source_name}")
items = []
for stream in sorted(streams, key=lambda stream: stream["content"]):
full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4"
video_url = stream["sources"]["mp4"][0]["src"]
items.append(IliasPageElement(IliasElementType.VIDEO, video_url, full_name))
return items
def _find_personal_desktop_entries(self) -> List[IliasPageElement]:
items: List[IliasPageElement] = []
titles: List[Tag] = self._soup.select(".il-item-title")
for title in titles:
link = title.find("a")
name = _sanitize_path_name(link.text.strip())
url = self._abs_url_from_link(link)
type = self._find_type_from_link(name, link, url)
if not type:
_unexpected_html_warning()
log.warn_contd(f"Could not extract type for {link}")
continue
log.explain(f"Found {name!r}")
if type == IliasElementType.FILE and "_download" not in url:
url = re.sub(r"(target=file_\d+)", r"\1_download", url)
log.explain("Rewired file URL to include download part")
items.append(IliasPageElement(type, url, name))
return items
def _find_copa_entries(self) -> List[IliasPageElement]:
items: List[IliasPageElement] = []
links: List[Tag] = self._soup.findAll(class_="ilc_flist_a_FileListItemLink")
for link in links:
url = self._abs_url_from_link(link)
name = _sanitize_path_name(link.getText().strip().replace("\t", ""))
if "file_id" not in url:
_unexpected_html_warning()
log.warn_contd(f"Found unknown content page item {name!r} with url {url!r}")
continue
items.append(IliasPageElement(IliasElementType.FILE, url, name))
return items
def _find_video_entries(self) -> List[IliasPageElement]:
# ILIAS has three stages for video pages
@ -192,11 +280,22 @@ class IliasPage:
def _listed_video_to_element(self, link: Tag) -> IliasPageElement:
# The link is part of a table with multiple columns, describing metadata.
# 6th child (1 indexed) is the modification time string
modification_string = link.parent.parent.parent.select_one(
"td.std:nth-child(6)"
).getText().strip()
modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M")
# 6th or 7th child (1 indexed) is the modification time string. Try to find it
# by parsing backwards from the end and finding something that looks like a date
modification_time = None
row: Tag = link.parent.parent.parent
column_count = len(row.select("td.std"))
for index in range(column_count, 0, -1):
modification_string = link.parent.parent.parent.select_one(
f"td.std:nth-child({index})"
).getText().strip()
if re.search(r"\d+\.\d+.\d+ - \d+:\d+", modification_string):
modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M")
break
if modification_time is None:
log.warn(f"Could not determine upload time for {link}")
modification_time = datetime.now()
title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip()
title += ".mp4"
@ -293,7 +392,13 @@ class IliasPage:
# Add each listing as a new
for listing in file_listings:
file_name = _sanitize_path_name(listing.getText().strip())
parent_container: Tag = listing.findParent(
"div", attrs={"class": lambda x: x and "form-group" in x}
)
label_container: Tag = parent_container.find(
attrs={"class": lambda x: x and "control-label" in x}
)
file_name = _sanitize_path_name(label_container.getText().strip())
url = self._abs_url_from_link(listing)
log.explain(f"Found exercise detail {file_name!r} at {url}")
results.append(IliasPageElement(
@ -336,6 +441,8 @@ class IliasPage:
log.explain(f"Found {element_name!r}")
result.append(IliasPageElement(element_type, abs_url, element_name, description=description))
result += self._find_cards()
return result
def _find_upwards_folder_hierarchy(self, tag: Tag) -> List[str]:
@ -363,7 +470,10 @@ class IliasPage:
continue
prev: Tag = parent.findPreviousSibling("div")
if "ilContainerBlockHeader" in prev.get("class"):
found_titles.append(prev.find("h3").getText().strip())
if prev.find("h3"):
found_titles.append(prev.find("h3").getText().strip())
else:
found_titles.append(prev.find("h2").getText().strip())
# And this for real accordions
if "il_VAccordionContentDef" in parent.get("class"):
@ -418,6 +528,90 @@ class IliasPage:
log.explain(f"Found file {full_path!r}")
return IliasPageElement(IliasElementType.FILE, url, full_path, modification_date)
def _find_cards(self) -> List[IliasPageElement]:
result: List[IliasPageElement] = []
card_titles: List[Tag] = self._soup.select(".card-title a")
for title in card_titles:
url = self._abs_url_from_link(title)
name = _sanitize_path_name(title.getText().strip())
type = self._find_type_from_card(title)
if not type:
_unexpected_html_warning()
log.warn_contd(f"Could not extract type for {title}")
continue
result.append(IliasPageElement(type, url, name))
card_button_tiles: List[Tag] = self._soup.select(".card-title button")
for button in card_button_tiles:
regex = re.compile(button["id"] + r".*window.open\(['\"](.+?)['\"]")
res = regex.search(str(self._soup))
if not res:
_unexpected_html_warning()
log.warn_contd(f"Could not find click handler target for {button}")
continue
url = self._abs_url_from_relative(res.group(1))
name = _sanitize_path_name(button.getText().strip())
type = self._find_type_from_card(button)
caption_parent = button.findParent(
"div",
attrs={"class": lambda x: x and "caption" in x},
)
description = caption_parent.find_next_sibling("div").getText().strip()
if not type:
_unexpected_html_warning()
log.warn_contd(f"Could not extract type for {button}")
continue
result.append(IliasPageElement(type, url, name, description=description))
return result
def _find_type_from_card(self, card_title: Tag) -> Optional[IliasElementType]:
def is_card_root(element: Tag) -> bool:
return "il-card" in element["class"] and "thumbnail" in element["class"]
card_root: Optional[Tag] = None
# We look for the card root
for parent in card_title.parents:
if is_card_root(parent):
card_root = parent
break
if card_root is None:
_unexpected_html_warning()
log.warn_contd(f"Tried to figure out element type, but did not find an icon for {card_title}")
return None
icon: Tag = card_root.select_one(".il-card-repository-head .icon")
if "opencast" in icon["class"]:
return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED
if "exc" in icon["class"]:
return IliasElementType.EXERCISE
if "webr" in icon["class"]:
return IliasElementType.LINK
if "book" in icon["class"]:
return IliasElementType.BOOKING
if "frm" in icon["class"]:
return IliasElementType.FORUM
if "sess" in icon["class"]:
return IliasElementType.MEETING
if "tst" in icon["class"]:
return IliasElementType.TEST
if "fold" in icon["class"]:
return IliasElementType.FOLDER
_unexpected_html_warning()
log.warn_contd(f"Could not extract type from {icon} for card title {card_title}")
return None
@staticmethod
def _find_type_from_link(
element_name: str,
@ -433,9 +627,30 @@ class IliasPage:
if "target=file_" in parsed_url.query:
return IliasElementType.FILE
if "target=grp_" in parsed_url.query:
return IliasElementType.FOLDER
if "target=crs_" in parsed_url.query:
return IliasElementType.FOLDER
if "baseClass=ilExerciseHandlerGUI" in parsed_url.query:
return IliasElementType.EXERCISE
if "baseClass=ilLinkResourceHandlerGUI" in parsed_url.query and "calldirectlink" in parsed_url.query:
return IliasElementType.LINK
if "cmd=showThreads" in parsed_url.query or "target=frm_" in parsed_url.query:
return IliasElementType.FORUM
if "cmdClass=ilobjtestgui" in parsed_url.query:
return IliasElementType.TEST
# Booking and Meeting can not be detected based on the link. They do have a ref_id though, so
# try to guess it from the image.
# Everything with a ref_id can *probably* be opened to reveal nested things
# video groups, directories, exercises, etc
if "ref_id=" in parsed_url.query:
if "ref_id=" in parsed_url.query or "goto.php" in parsed_url.path:
return IliasPage._find_type_from_folder_like(link_element, url)
_unexpected_html_warning()
@ -456,7 +671,7 @@ class IliasPage:
# We look for the outer div of our inner link, to find information around it
# (mostly the icon)
for parent in link_element.parents:
if "ilContainerListItemOuter" in parent["class"]:
if "ilContainerListItemOuter" in parent["class"] or "il-std-item" in parent["class"]:
found_parent = parent
break
@ -468,13 +683,16 @@ class IliasPage:
# Find the small descriptive icon to figure out the type
img_tag: Optional[Tag] = found_parent.select_one("img.ilListItemIcon")
if img_tag is None:
img_tag = found_parent.select_one("img.icon")
if img_tag is None:
_unexpected_html_warning()
log.warn_contd(f"Tried to figure out element type, but did not find an image for {url}")
return None
if "opencast" in str(img_tag["alt"]).lower():
return IliasElementType.VIDEO_FOLDER
return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED
if str(img_tag["src"]).endswith("icon_exc.svg"):
return IliasElementType.EXERCISE
@ -482,6 +700,9 @@ class IliasPage:
if str(img_tag["src"]).endswith("icon_webr.svg"):
return IliasElementType.LINK
if str(img_tag["src"]).endswith("icon_book.svg"):
return IliasElementType.BOOKING
if str(img_tag["src"]).endswith("frm.svg"):
return IliasElementType.FORUM
@ -515,7 +736,13 @@ class IliasPage:
"""
Create an absolute url from an <a> tag.
"""
return urljoin(self._page_url, link_tag.get("href"))
return self._abs_url_from_relative(link_tag.get("href"))
def _abs_url_from_relative(self, relative_url: str) -> str:
"""
Create an absolute url from a relative URL.
"""
return urljoin(self._page_url, relative_url)
def _unexpected_html_warning() -> None:

View File

@ -1,9 +1,11 @@
import asyncio
import re
from collections.abc import Awaitable, Coroutine
from pathlib import PurePath
from typing import Any, Awaitable, Callable, Dict, List, Optional, Set, TypeVar, Union
from typing import Any, Callable, Dict, List, Optional, Set, Union, cast
import aiohttp
import yarl
from aiohttp import hdrs
from bs4 import BeautifulSoup, Tag
@ -12,7 +14,7 @@ from ...config import Config
from ...logging import ProgressBar, log
from ...output_dir import FileSink, Redownload
from ...utils import fmt_path, soupify, url_set_query_param
from ..crawler import CrawlError, CrawlWarning, anoncritical
from ..crawler import AWrapped, CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical
from ..http_crawler import HttpCrawler, HttpCrawlerSection
from .file_templates import Links
from .kit_ilias_html import IliasElementType, IliasPage, IliasPageElement
@ -81,17 +83,14 @@ _VIDEO_ELEMENTS: Set[IliasElementType] = set([
IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED,
])
AWrapped = TypeVar("AWrapped", bound=Callable[..., Awaitable[None]])
def _iorepeat(attempts: int, name: str) -> Callable[[AWrapped], AWrapped]:
def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]:
def decorator(f: AWrapped) -> AWrapped:
async def wrapper(*args: Any, **kwargs: Any) -> None:
async def wrapper(*args: Any, **kwargs: Any) -> Optional[Any]:
last_exception: Optional[BaseException] = None
for round in range(attempts):
try:
await f(*args, **kwargs)
return
return await f(*args, **kwargs)
except aiohttp.ContentTypeError: # invalid content type
raise CrawlWarning("ILIAS returned an invalid content type")
except aiohttp.TooManyRedirects:
@ -106,7 +105,10 @@ def _iorepeat(attempts: int, name: str) -> Callable[[AWrapped], AWrapped]:
if last_exception:
message = f"Error in I/O Operation: {last_exception}"
raise CrawlWarning(message) from last_exception
if failure_is_error:
raise CrawlError(message) from last_exception
else:
raise CrawlWarning(message) from last_exception
raise CrawlError("Impossible return in ilias _iorepeat")
return wrapper # type: ignore
@ -180,6 +182,7 @@ instance's greatest bottleneck.
self._link_file_redirect_delay = section.link_redirect_delay()
self._links = section.links()
self._videos = section.videos()
self._visited_urls: Set[str] = set()
async def _run(self) -> None:
if isinstance(self._target, int):
@ -201,7 +204,9 @@ instance's greatest bottleneck.
await self._crawl_url(root_url, expected_id=course_id)
async def _crawl_desktop(self) -> None:
await self._crawl_url(self._base_url)
appendix = r"ILIAS\PersonalDesktop\PDMainBarProvider|mm_pd_sel_items"
appendix = appendix.encode("ASCII").hex()
await self._crawl_url(self._base_url + "/gs_content.php?item=" + appendix)
async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None:
maybe_cl = await self.crawl(PurePath("."))
@ -230,17 +235,35 @@ instance's greatest bottleneck.
# Fill up our task list with the found elements
await gather_elements()
tasks = [self._handle_ilias_element(PurePath("."), element) for element in elements]
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(PurePath("."), element):
tasks.append(asyncio.create_task(handle))
# And execute them
await self.gather(tasks)
async def _handle_ilias_page(self, url: str, parent: IliasPageElement, path: PurePath) -> None:
async def _handle_ilias_page(
self,
url: str,
parent: IliasPageElement,
path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(path)
if not maybe_cl:
return
cl = maybe_cl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
return None
return self._crawl_ilias_page(url, parent, maybe_cl)
@anoncritical
async def _crawl_ilias_page(
self,
url: str,
parent: IliasPageElement,
cl: CrawlToken,
) -> None:
elements: List[IliasPageElement] = []
@_iorepeat(3, "crawling folder")
@ -248,28 +271,51 @@ instance's greatest bottleneck.
elements.clear()
async with cl:
next_stage_url: Optional[str] = url
current_parent = parent
while next_stage_url:
soup = await self._get_page(next_stage_url)
log.explain_topic(f"Parsing HTML page for {fmt_path(path)}")
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
page = IliasPage(soup, url, parent)
next_stage_url = page.get_next_stage_url()
page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
next_stage_url = next_element.url
else:
next_stage_url = None
elements.extend(page.get_child_elements())
# Fill up our task list with the found elements
await gather_elements()
tasks = [self._handle_ilias_element(cl.path, element) for element in elements]
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(cl.path, element):
tasks.append(asyncio.create_task(handle))
# And execute them
await self.gather(tasks)
# These decorators only apply *to this method* and *NOT* to the returned
# awaitables!
# This method does not await the handlers but returns them instead.
# This ensures one level is handled at a time and name deduplication
# works correctly.
@anoncritical
# Shouldn't happen but we also really don't want to let I/O errors bubble up to anoncritical.
# If that happens we will be terminated as anoncritical doesn't tream them as non-critical.
@_wrap_io_in_warning("handling ilias element")
async def _handle_ilias_element(self, parent_path: PurePath, element: IliasPageElement) -> None:
async def _handle_ilias_element(
self,
parent_path: PurePath,
element: IliasPageElement,
) -> Optional[Coroutine[Any, Any, None]]:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. Aborting subpath"
)
self._visited_urls.add(element.url)
element_path = PurePath(parent_path, element.name)
if element.type in _VIDEO_ELEMENTS:
@ -277,35 +323,43 @@ instance's greatest bottleneck.
if not self._videos:
log.explain("Video crawling is disabled")
log.explain("Answer: no")
return
return None
else:
log.explain("Video crawling is enabled")
log.explain("Answer: yes")
if element.type == IliasElementType.FILE:
await self._download_file(element, element_path)
return await self._handle_file(element, element_path)
elif element.type == IliasElementType.FORUM:
log.explain_topic(f"Decision: Crawl {fmt_path(element_path)}")
log.explain("Forums are not supported")
log.explain("Answer: No")
return None
elif element.type == IliasElementType.TEST:
log.explain_topic(f"Decision: Crawl {fmt_path(element_path)}")
log.explain("Tests contain no relevant files")
log.explain("Answer: No")
return None
elif element.type == IliasElementType.LINK:
await self._download_link(element, element_path)
return await self._handle_link(element, element_path)
elif element.type == IliasElementType.BOOKING:
return await self._handle_booking(element, element_path)
elif element.type == IliasElementType.VIDEO:
await self._download_file(element, element_path)
return await self._handle_file(element, element_path)
elif element.type == IliasElementType.VIDEO_PLAYER:
await self._download_video(element, element_path)
return await self._handle_video(element, element_path)
elif element.type in _DIRECTORY_PAGES:
await self._handle_ilias_page(element.url, element, element_path)
return await self._handle_ilias_page(element.url, element, element_path)
else:
# This will retry it a few times, failing everytime. It doesn't make any network
# requests, so that's fine.
raise CrawlWarning(f"Unknown element type: {element.type!r}")
async def _download_link(self, element: IliasPageElement, element_path: PurePath) -> None:
async def _handle_link(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
log.explain_topic(f"Decision: Crawl Link {fmt_path(element_path)}")
log.explain(f"Links type is {self._links}")
@ -313,32 +367,74 @@ instance's greatest bottleneck.
link_extension = self._links.extension()
if not link_template_maybe or not link_extension:
log.explain("Answer: No")
return
return None
else:
log.explain("Answer: Yes")
link_template = link_template_maybe
element_path = element_path.with_name(element_path.name + link_extension)
maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl:
return
dl = maybe_dl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
return None
@_iorepeat(3, "resolving link")
async def impl() -> None:
async with dl as (bar, sink):
export_url = element.url.replace("cmd=calldirectlink", "cmd=exportHTML")
real_url = await self._resolve_link_target(export_url)
return self._download_link(element, link_template_maybe, maybe_dl)
content = link_template
content = content.replace("{{link}}", real_url)
content = content.replace("{{name}}", element.name)
content = content.replace("{{description}}", str(element.description))
content = content.replace("{{redirect_delay}}", str(self._link_file_redirect_delay))
sink.file.write(content.encode("utf-8"))
sink.done()
@anoncritical
@_iorepeat(3, "resolving link")
async def _download_link(self, element: IliasPageElement, link_template: str, dl: DownloadToken) -> None:
async with dl as (bar, sink):
export_url = element.url.replace("cmd=calldirectlink", "cmd=exportHTML")
real_url = await self._resolve_link_target(export_url)
self._write_link_content(link_template, real_url, element.name, element.description, sink)
await impl()
def _write_link_content(
self,
link_template: str,
url: str,
name: str,
description: Optional[str],
sink: FileSink,
) -> None:
content = link_template
content = content.replace("{{link}}", url)
content = content.replace("{{name}}", name)
content = content.replace("{{description}}", str(description))
content = content.replace("{{redirect_delay}}", str(self._link_file_redirect_delay))
sink.file.write(content.encode("utf-8"))
sink.done()
async def _handle_booking(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
log.explain_topic(f"Decision: Crawl Booking Link {fmt_path(element_path)}")
log.explain(f"Links type is {self._links}")
link_template_maybe = self._links.template()
link_extension = self._links.extension()
if not link_template_maybe or not link_extension:
log.explain("Answer: No")
return None
else:
log.explain("Answer: Yes")
element_path = element_path.with_name(element_path.name + link_extension)
maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl:
return None
return self._download_booking(element, link_template_maybe, maybe_dl)
@anoncritical
@_iorepeat(3, "resolving booking")
async def _download_booking(
self,
element: IliasPageElement,
link_template: str,
dl: DownloadToken,
) -> None:
async with dl as (bar, sink):
self._write_link_content(link_template, element.url, element.name, element.description, sink)
async def _resolve_link_target(self, export_url: str) -> str:
async with self.session.get(export_url, allow_redirects=False) as resp:
@ -346,7 +442,7 @@ instance's greatest bottleneck.
if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()).select_one("a").get("href").strip()
self._authenticate()
await self._authenticate()
async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated
@ -355,39 +451,130 @@ instance's greatest bottleneck.
raise CrawlError("resolve_link_target failed even after authenticating")
async def _download_video(self, element: IliasPageElement, element_path: PurePath) -> None:
# Videos will NOT be redownloaded - their content doesn't really change and they are chunky
maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.NEVER)
if not maybe_dl:
return
dl = maybe_dl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
async def _handle_video(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
# Copy old mapping as it is likely still relevant
if self.prev_report:
self.report.add_custom_value(
str(element_path),
self.prev_report.get_custom_value(str(element_path))
)
@_iorepeat(3, "downloading video")
async def impl() -> None:
assert dl # The function is only reached when dl is not None
async with dl as (bar, sink):
page = IliasPage(await self._get_page(element.url), element.url, element)
real_element = page.get_child_elements()[0]
# A video might contain other videos, so let's "crawl" the video first
# to ensure rate limits apply. This must be a download as *this token*
# is re-used if the video consists of a single stream. In that case the
# file name is used and *not* the stream name the ilias html parser reported
# to ensure backwards compatibility.
maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.ALWAYS)
log.explain(f"Streaming video from real url {real_element.url}")
# If we do not want to crawl it (user filter) or we have every file
# from the cached mapping already, we can ignore this and bail
if not maybe_dl or self._all_videos_locally_present(element_path):
# Mark all existing cideos as known so they do not get deleted
# during dleanup. We "downloaded" them, just without actually making
# a network request as we assumed they did not change.
for video in self._previous_contained_videos(element_path):
await self.download(video)
await self._stream_from_url(real_element.url, sink, bar, is_video=True)
return None
await impl()
return self._download_video(element_path, element, maybe_dl)
async def _download_file(self, element: IliasPageElement, element_path: PurePath) -> None:
def _previous_contained_videos(self, video_path: PurePath) -> List[PurePath]:
if not self.prev_report:
return []
custom_value = self.prev_report.get_custom_value(str(video_path))
if not custom_value:
return []
names = cast(List[str], custom_value)
folder = video_path.parent
return [PurePath(folder, name) for name in names]
def _all_videos_locally_present(self, video_path: PurePath) -> bool:
if contained_videos := self._previous_contained_videos(video_path):
log.explain_topic(f"Checking local cache for video {video_path.name}")
all_found_locally = True
for video in contained_videos:
transformed_path = self._to_local_video_path(video)
if transformed_path:
exists_locally = self._output_dir.resolve(transformed_path).exists()
all_found_locally = all_found_locally and exists_locally
if all_found_locally:
log.explain("Found all videos locally, skipping enumeration request")
return True
log.explain("Missing at least one video, continuing with requests!")
return False
def _to_local_video_path(self, path: PurePath) -> Optional[PurePath]:
if transformed := self._transformer.transform(path):
return self._deduplicator.fixup_path(transformed)
return None
@anoncritical
@_iorepeat(3, "downloading video")
async def _download_video(
self,
original_path: PurePath,
element: IliasPageElement,
dl: DownloadToken
) -> None:
stream_elements: List[IliasPageElement] = []
async with dl as (bar, sink):
page = IliasPage(await self._get_page(element.url), element.url, element)
stream_elements = page.get_child_elements()
if len(stream_elements) > 1:
log.explain(f"Found multiple video streams for {element.name}")
else:
log.explain(f"Using single video mode for {element.name}")
stream_element = stream_elements[0]
transformed_path = self._to_local_video_path(original_path)
if not transformed_path:
raise CrawlError(f"Download returned a path but transform did not for {original_path}")
# We do not have a local cache yet
if self._output_dir.resolve(transformed_path).exists():
log.explain(f"Video for {element.name} existed locally")
else:
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
self.report.add_custom_value(str(original_path), [original_path.name])
return
contained_video_paths: List[str] = []
for stream_element in stream_elements:
video_path = original_path.parent / stream_element.name
contained_video_paths.append(str(video_path))
maybe_dl = await self.download(video_path, mtime=element.mtime, redownload=Redownload.NEVER)
if not maybe_dl:
continue
async with maybe_dl as (bar, sink):
log.explain(f"Streaming video from real url {stream_element.url}")
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
self.report.add_custom_value(str(original_path), contained_video_paths)
async def _handle_file(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl:
return
dl = maybe_dl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
return None
return self._download_file(element, maybe_dl)
@_iorepeat(3, "downloading file")
async def impl() -> None:
assert dl # The function is only reached when dl is not None
async with dl as (bar, sink):
await self._stream_from_url(element.url, sink, bar, is_video=False)
await impl()
@anoncritical
@_iorepeat(3, "downloading file")
async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None:
assert dl # The function is only reached when dl is not None
async with dl as (bar, sink):
await self._stream_from_url(element.url, sink, bar, is_video=False)
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None:
async def try_stream() -> bool:
@ -438,16 +625,23 @@ instance's greatest bottleneck.
# We repeat this as the login method in shibboleth doesn't handle I/O errors.
# Shibboleth is quite reliable as well, the repeat is likely not critical here.
@_iorepeat(3, "Login")
@_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session)
@staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
userlog = soup.find("li", {"id": "userlog"})
if userlog is not None:
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find("button", attrs={"data-action": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
@ -485,14 +679,14 @@ class KitShibbolethLogin:
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
url = "https://ilias.studium.kit.edu/Shibboleth.sso/Login"
url = "https://ilias.studium.kit.edu/shib_login.php"
data = {
"sendLogin": "1",
"idp_selection": "https://idp.scc.kit.edu/idp/shibboleth",
"target": "/shib_login.php",
"home_organization_selection": "Mit KIT-Account anmelden",
"il_target": "",
"home_organization_selection": "Weiter",
}
soup: BeautifulSoup = await _post(sess, url, data)
soup: BeautifulSoup = await _shib_post(sess, url, data)
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
@ -515,6 +709,12 @@ class KitShibbolethLogin:
}
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup)
@ -572,3 +772,37 @@ class KitShibbolethLogin:
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())
async def _shib_post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
"""
aiohttp unescapes '/' and ':' in URL query parameters which is not RFC compliant and rejected
by Shibboleth. Thanks a lot. So now we unroll the requests manually, parse location headers and
build encoded URL objects ourselves... Who thought mangling location header was a good idea??
"""
async with session.post(url, data=data, allow_redirects=False) as response:
location = response.headers.get("location")
if not location:
raise CrawlWarning(f"Login failed (1), no location header present at {url}")
correct_url = yarl.URL(location, encoded=True)
async with session.get(correct_url, allow_redirects=False) as response:
location = response.headers.get("location")
# If shib still still has a valid session, it will directly respond to the request
if location is None:
return soupify(await response.read())
as_yarl = yarl.URL(response.url)
# Probably not needed anymore, but might catch a few weird situations with a nicer message
if not location or not as_yarl.host:
raise CrawlWarning(f"Login failed (2), no location header present at {correct_url}")
correct_url = yarl.URL.build(
scheme=as_yarl.scheme,
host=as_yarl.host,
path=location,
encoded=True
)
async with session.get(correct_url, allow_redirects=False) as response:
return soupify(await response.read())

View File

@ -0,0 +1,164 @@
import os
import re
from dataclasses import dataclass
from pathlib import PurePath
from typing import Awaitable, List, Optional, Pattern, Set, Union
from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag
from ..config import Config
from ..logging import ProgressBar, log
from ..output_dir import FileSink
from ..utils import soupify
from .crawler import CrawlError
from .http_crawler import HttpCrawler, HttpCrawlerSection
class KitIpdCrawlerSection(HttpCrawlerSection):
def target(self) -> str:
target = self.s.get("target")
if not target:
self.missing_value("target")
if not target.startswith("https://"):
self.invalid_value("target", target, "Should be a URL")
return target
def link_regex(self) -> Pattern[str]:
regex = self.s.get("link_regex", r"^.*/[^/]*\.(?:pdf|zip|c|java)$")
return re.compile(regex)
@dataclass(unsafe_hash=True)
class KitIpdFile:
name: str
url: str
@dataclass
class KitIpdFolder:
name: str
files: List[KitIpdFile]
def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}")
for file in self.files:
log.explain(f"File {file.name!r}")
def __hash__(self) -> int:
return self.name.__hash__()
class KitIpdCrawler(HttpCrawler):
def __init__(
self,
name: str,
section: KitIpdCrawlerSection,
config: Config,
):
super().__init__(name, section, config)
self._url = section.target()
self._file_regex = section.link_regex()
async def _run(self) -> None:
maybe_cl = await self.crawl(PurePath("."))
if not maybe_cl:
return
tasks: List[Awaitable[None]] = []
async with maybe_cl:
for item in await self._fetch_items():
if isinstance(item, KitIpdFolder):
tasks.append(self._crawl_folder(item))
else:
# Orphan files are placed in the root folder
tasks.append(self._download_file(PurePath("."), item))
await self.gather(tasks)
async def _crawl_folder(self, folder: KitIpdFolder) -> None:
path = PurePath(folder.name)
if not await self.crawl(path):
return
tasks = [self._download_file(path, file) for file in folder.files]
await self.gather(tasks)
async def _download_file(self, parent: PurePath, file: KitIpdFile) -> None:
element_path = parent / file.name
maybe_dl = await self.download(element_path)
if not maybe_dl:
return
async with maybe_dl as (bar, sink):
await self._stream_from_url(file.url, sink, bar)
async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]:
page = await self.get_page()
elements: List[Tag] = self._find_file_links(page)
items: Set[Union[KitIpdFile, KitIpdFolder]] = set()
for element in elements:
folder_label = self._find_folder_label(element)
if folder_label:
folder = self._extract_folder(folder_label)
if folder not in items:
items.add(folder)
folder.explain()
else:
file = self._extract_file(element)
items.add(file)
log.explain_topic(f"Orphan file {file.name!r}")
log.explain("Attributing it to root folder")
return items
def _extract_folder(self, folder_tag: Tag) -> KitIpdFolder:
files: List[KitIpdFile] = []
name = folder_tag.getText().strip()
container: Tag = folder_tag.findNextSibling(name="table")
for link in self._find_file_links(container):
files.append(self._extract_file(link))
return KitIpdFolder(name, files)
@staticmethod
def _find_folder_label(file_link: Tag) -> Optional[Tag]:
enclosing_table: Tag = file_link.findParent(name="table")
if enclosing_table is None:
return None
return enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$"))
def _extract_file(self, link: Tag) -> KitIpdFile:
url = self._abs_url_from_link(link)
name = os.path.basename(url)
return KitIpdFile(name, url)
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> List[Tag]:
return tag.findAll(name="a", attrs={"href": self._file_regex})
def _abs_url_from_link(self, link_tag: Tag) -> str:
return urljoin(self._url, link_tag.get("href"))
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp:
if resp.status == 403:
raise CrawlError("Received a 403. Are you within the KIT network/VPN?")
if resp.content_length:
bar.set_total(resp.content_length)
async for data in resp.content.iter_chunked(1024):
sink.file.write(data)
bar.advance(len(data))
sink.done()
async def get_page(self) -> BeautifulSoup:
async with self.session.get(self._url) as request:
return soupify(await request.read())

View File

@ -56,6 +56,12 @@ class Deduplicator:
log.explain(f"Changed path to {fmt_path(new_path)} for windows compatibility")
return new_path
def fixup_path(self, path: PurePath) -> PurePath:
"""Fixes up the path for windows, if enabled. Returns the path unchanged otherwise."""
if self._windows_paths:
return self._fixup_for_windows(path)
return path
def mark(self, path: PurePath) -> PurePath:
if self._windows_paths:
path = self._fixup_for_windows(path)

View File

@ -5,7 +5,7 @@ from contextlib import asynccontextmanager, contextmanager
# TODO In Python 3.9 and above, ContextManager is deprecated
from typing import AsyncIterator, ContextManager, Iterator, List, Optional
from rich.console import Console, RenderGroup
from rich.console import Console, Group
from rich.live import Live
from rich.markup import escape
from rich.panel import Panel
@ -68,7 +68,7 @@ class Log:
if self._download_progress.task_ids:
elements.append(self._download_progress)
group = RenderGroup(*elements) # type: ignore
group = Group(*elements)
self._live.update(group)
@contextmanager

View File

@ -231,7 +231,10 @@ class OutputDirectory:
stat = local_path.stat()
remote_newer = None
if mtime := heuristics.mtime:
# Python on Windows crashes when faced with timestamps around the unix epoch
if heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970):
mtime = heuristics.mtime
remote_newer = mtime.timestamp() > stat.st_mtime
if remote_newer:
log.explain("Remote file seems to be newer")
@ -500,7 +503,7 @@ class OutputDirectory:
try:
self._prev_report = Report.load(self._report_path)
log.explain("Loaded report successfully")
except (OSError, json.JSONDecodeError, ReportLoadError) as e:
except (OSError, UnicodeDecodeError, json.JSONDecodeError, ReportLoadError) as e:
log.explain("Failed to load report")
log.explain(str(e))

View File

@ -3,9 +3,9 @@ from typing import Dict, List, Optional
from rich.markup import escape
from .auth import AUTHENTICATORS, Authenticator, AuthError
from .auth import AUTHENTICATORS, Authenticator, AuthError, AuthSection
from .config import Config, ConfigOptionError
from .crawl import CRAWLERS, Crawler, CrawlError, KitIliasWebCrawler
from .crawl import CRAWLERS, Crawler, CrawlError, CrawlerSection, KitIliasWebCrawler
from .logging import log
from .utils import fmt_path
@ -15,30 +15,33 @@ class PferdLoadError(Exception):
class Pferd:
def __init__(self, config: Config, cli_crawlers: Optional[List[str]]):
def __init__(self, config: Config, cli_crawlers: Optional[List[str]], cli_skips: Optional[List[str]]):
"""
May throw PferdLoadError.
"""
self._config = config
self._crawlers_to_run = self._find_crawlers_to_run(config, cli_crawlers)
self._crawlers_to_run = self._find_crawlers_to_run(config, cli_crawlers, cli_skips)
self._authenticators: Dict[str, Authenticator] = {}
self._crawlers: Dict[str, Crawler] = {}
def _find_crawlers_to_run(self, config: Config, cli_crawlers: Optional[List[str]]) -> List[str]:
log.explain_topic("Deciding which crawlers to run")
crawl_sections = [name for name, _ in config.crawl_sections()]
def _find_config_crawlers(self, config: Config) -> List[str]:
crawl_sections = []
if cli_crawlers is None:
log.explain("No crawlers specified on CLI")
log.explain("Running all crawlers specified in config")
return crawl_sections
for name, section in config.crawl_sections():
if CrawlerSection(section).skip():
log.explain(f"Skipping {name!r}")
else:
crawl_sections.append(name)
return crawl_sections
def _find_cli_crawlers(self, config: Config, cli_crawlers: List[str]) -> List[str]:
if len(cli_crawlers) != len(set(cli_crawlers)):
raise PferdLoadError("Some crawlers were selected multiple times")
log.explain("Crawlers specified on CLI")
crawl_sections = [name for name, _ in config.crawl_sections()]
crawlers_to_run = [] # With crawl: prefix
unknown_names = [] # Without crawl: prefix
@ -62,10 +65,36 @@ class Pferd:
return crawlers_to_run
def _find_crawlers_to_run(
self,
config: Config,
cli_crawlers: Optional[List[str]],
cli_skips: Optional[List[str]],
) -> List[str]:
log.explain_topic("Deciding which crawlers to run")
crawlers: List[str]
if cli_crawlers is None:
log.explain("No crawlers specified on CLI")
log.explain("Running crawlers specified in config")
crawlers = self._find_config_crawlers(config)
else:
log.explain("Crawlers specified on CLI")
crawlers = self._find_cli_crawlers(config, cli_crawlers)
skips = {f"crawl:{name}" for name in cli_skips} if cli_skips else set()
for crawler in crawlers:
if crawler in skips:
log.explain(f"Skipping crawler {crawler!r}")
crawlers = [crawler for crawler in crawlers if crawler not in skips]
return crawlers
def _load_authenticators(self) -> None:
for name, section in self._config.auth_sections():
log.print(f"[bold bright_cyan]Loading[/] {escape(name)}")
auth_type = section.get("type")
auth_type = AuthSection(section).type()
authenticator_constructor = AUTHENTICATORS.get(auth_type)
if authenticator_constructor is None:
raise ConfigOptionError(name, "type", f"Unknown authenticator type: {auth_type!r}")
@ -80,7 +109,7 @@ class Pferd:
for name, section in self._config.crawl_sections():
log.print(f"[bold bright_cyan]Loading[/] {escape(name)}")
crawl_type = section.get("type")
crawl_type = CrawlerSection(section).type()
crawler_constructor = CRAWLERS.get(crawl_type)
if crawler_constructor is None:
raise ConfigOptionError(name, "type", f"Unknown crawler type: {crawl_type!r}")
@ -153,5 +182,13 @@ class Pferd:
something_changed = True
log.report(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
for warning in crawler.report.encountered_warnings:
something_changed = True
log.report(f" [bold bright_red]Warning[/] {warning}")
for error in crawler.report.encountered_errors:
something_changed = True
log.report(f" [bold bright_red]Error[/] {error}")
if not something_changed:
log.report(" Nothing changed")

View File

@ -1,6 +1,6 @@
import json
from pathlib import Path, PurePath
from typing import Any, Dict, List, Set
from typing import Any, Dict, List, Optional, Set
class ReportLoadError(Exception):
@ -68,6 +68,13 @@ class Report:
# Files that should have been deleted by the cleanup but weren't
self.not_deleted_files: Set[PurePath] = set()
# Custom crawler-specific data
self.custom: Dict[str, Any] = dict()
# Encountered errors and warnings
self.encountered_warnings: List[str] = []
self.encountered_errors: List[str] = []
@staticmethod
def _get_list_of_strs(data: Dict[str, Any], key: str) -> List[str]:
result: Any = data.get(key, [])
@ -81,13 +88,22 @@ class Report:
return result
@staticmethod
def _get_str_dictionary(data: Dict[str, Any], key: str) -> Dict[str, Any]:
result: Dict[str, Any] = data.get(key, {})
if not isinstance(result, dict):
raise ReportLoadError(f"Incorrect format: {key!r} is not a dictionary")
return result
@classmethod
def load(cls, path: Path) -> "Report":
"""
May raise OSError, JsonDecodeError, ReportLoadError.
May raise OSError, UnicodeDecodeError, JsonDecodeError, ReportLoadError.
"""
with open(path) as f:
with open(path, encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
@ -108,6 +124,9 @@ class Report:
self.delete_file(PurePath(elem))
for elem in self._get_list_of_strs(data, "not_deleted"):
self.not_delete_file(PurePath(elem))
self.custom = self._get_str_dictionary(data, "custom")
self.encountered_errors = self._get_list_of_strs(data, "encountered_errors")
self.encountered_warnings = self._get_list_of_strs(data, "encountered_warnings")
return self
@ -124,9 +143,12 @@ class Report:
"changed": [str(path) for path in sorted(self.changed_files)],
"deleted": [str(path) for path in sorted(self.deleted_files)],
"not_deleted": [str(path) for path in sorted(self.not_deleted_files)],
"custom": self.custom,
"encountered_warnings": self.encountered_warnings,
"encountered_errors": self.encountered_errors,
}
with open(path, "w") as f:
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, sort_keys=True)
f.write("\n") # json.dump doesn't do this
@ -190,3 +212,27 @@ class Report:
"""
self.not_deleted_files.add(path)
def add_custom_value(self, key: str, value: Any) -> None:
"""
Adds a custom value under the passed key, overwriting any existing
"""
self.custom[key] = value
def get_custom_value(self, key: str) -> Optional[Any]:
"""
Retrieves a custom value for the given key.
"""
return self.custom.get(key)
def add_error(self, error: str) -> None:
"""
Adds an error to this report's error list.
"""
self.encountered_errors.append(error)
def add_warning(self, warning: str) -> None:
"""
Adds a warning to this report's warning list.
"""
self.encountered_warnings.append(warning)

View File

@ -1,151 +1,166 @@
# I'm sorry that this code has become a bit dense and unreadable. While
# reading, it is important to remember what True and False mean. I'd love to
# have some proper sum-types for the inputs and outputs, they'd make this code
# a lot easier to understand.
import ast
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from pathlib import PurePath
from typing import Dict, Optional, Sequence, Union
from typing import Callable, Dict, List, Optional, Sequence, TypeVar, Union
from .logging import log
from .utils import fmt_path
from .utils import fmt_path, str_path
class Rule(ABC):
class ArrowHead(Enum):
NORMAL = 0
SEQUENCE = 1
class Ignore:
pass
class Empty:
pass
RightSide = Union[str, Ignore, Empty]
@dataclass
class Transformed:
path: PurePath
class Ignored:
pass
TransformResult = Optional[Union[Transformed, Ignored]]
@dataclass
class Rule:
left: str
left_index: int
name: str
head: ArrowHead
right: RightSide
right_index: int
def right_result(self, path: PurePath) -> Union[str, Transformed, Ignored]:
if isinstance(self.right, str):
return self.right
elif isinstance(self.right, Ignore):
return Ignored()
elif isinstance(self.right, Empty):
return Transformed(path)
else:
raise RuntimeError(f"Right side has invalid type {type(self.right)}")
class Transformation(ABC):
def __init__(self, rule: Rule):
self.rule = rule
@abstractmethod
def transform(self, path: PurePath) -> Union[PurePath, bool]:
"""
Try to apply this rule to the path. Returns another path if the rule
was successfully applied, True if the rule matched but resulted in an
exclamation mark, and False if the rule didn't match at all.
"""
def transform(self, path: PurePath) -> TransformResult:
pass
# These rules all use a Union[T, bool] for their right side. They are passed a
# T if the arrow's right side was a normal string, True if it was an
# exclamation mark and False if it was missing entirely.
class NormalRule(Rule):
def __init__(self, left: PurePath, right: Union[PurePath, bool]):
self._left = left
self._right = right
def _match_prefix(self, path: PurePath) -> Optional[PurePath]:
left_parts = list(reversed(self._left.parts))
path_parts = list(reversed(path.parts))
if len(left_parts) > len(path_parts):
class ExactTf(Transformation):
def transform(self, path: PurePath) -> TransformResult:
if path != PurePath(self.rule.left):
return None
while left_parts and path_parts:
left_part = left_parts.pop()
path_part = path_parts.pop()
right = self.rule.right_result(path)
if not isinstance(right, str):
return right
if left_part != path_part:
return None
return Transformed(PurePath(right))
if left_parts:
class ExactReTf(Transformation):
def transform(self, path: PurePath) -> TransformResult:
match = re.fullmatch(self.rule.left, str_path(path))
if not match:
return None
path_parts.reverse()
return PurePath(*path_parts)
right = self.rule.right_result(path)
if not isinstance(right, str):
return right
def transform(self, path: PurePath) -> Union[PurePath, bool]:
if rest := self._match_prefix(path):
if isinstance(self._right, bool):
return self._right or path
# For some reason, mypy thinks that "groups" has type List[str]. But
# since elements of "match.groups()" can be None, mypy is wrong.
groups: Sequence[Optional[str]] = [match[0]] + list(match.groups())
locals_dir: Dict[str, Union[str, int, float]] = {}
for i, group in enumerate(groups):
if group is None:
continue
locals_dir[f"g{i}"] = group
try:
locals_dir[f"i{i}"] = int(group)
except ValueError:
pass
try:
locals_dir[f"f{i}"] = float(group)
except ValueError:
pass
result = eval(f"f{right!r}", {}, locals_dir)
return Transformed(PurePath(result))
class RenamingParentsTf(Transformation):
def __init__(self, sub_tf: Transformation):
super().__init__(sub_tf.rule)
self.sub_tf = sub_tf
def transform(self, path: PurePath) -> TransformResult:
for i in range(len(path.parts), -1, -1):
parent = PurePath(*path.parts[:i])
child = PurePath(*path.parts[i:])
transformed = self.sub_tf.transform(parent)
if not transformed:
continue
elif isinstance(transformed, Transformed):
return Transformed(transformed.path / child)
elif isinstance(transformed, Ignored):
return transformed
else:
return self._right / rest
raise RuntimeError(f"Invalid transform result of type {type(transformed)}: {transformed}")
return False
return None
class ExactRule(Rule):
def __init__(self, left: PurePath, right: Union[PurePath, bool]):
self._left = left
self._right = right
class RenamingPartsTf(Transformation):
def __init__(self, sub_tf: Transformation):
super().__init__(sub_tf.rule)
self.sub_tf = sub_tf
def transform(self, path: PurePath) -> Union[PurePath, bool]:
if path == self._left:
if isinstance(self._right, bool):
return self._right or path
else:
return self._right
return False
class NameRule(Rule):
def __init__(self, subrule: Rule):
self._subrule = subrule
def transform(self, path: PurePath) -> Union[PurePath, bool]:
matched = False
def transform(self, path: PurePath) -> TransformResult:
result = PurePath()
any_part_matched = False
for part in path.parts:
part_result = self._subrule.transform(PurePath(part))
if isinstance(part_result, PurePath):
matched = True
result /= part_result
elif part_result:
# If any subrule call ignores its path segment, the entire path
# should be ignored
return True
else:
# The subrule doesn't modify this segment, but maybe other
# segments
transformed = self.sub_tf.transform(PurePath(part))
if not transformed:
result /= part
elif isinstance(transformed, Transformed):
result /= transformed.path
any_part_matched = True
elif isinstance(transformed, Ignored):
return transformed
else:
raise RuntimeError(f"Invalid transform result of type {type(transformed)}: {transformed}")
if matched:
return result
if any_part_matched:
return Transformed(result)
else:
# The subrule has modified no segments, so this name version of it
# doesn't match
return False
class ReRule(Rule):
def __init__(self, left: str, right: Union[str, bool]):
self._left = left
self._right = right
def transform(self, path: PurePath) -> Union[PurePath, bool]:
if match := re.fullmatch(self._left, str(path)):
if isinstance(self._right, bool):
return self._right or path
vars: Dict[str, Union[str, int, float]] = {}
# For some reason, mypy thinks that "groups" has type List[str].
# But since elements of "match.groups()" can be None, mypy is
# wrong.
groups: Sequence[Optional[str]] = [match[0]] + list(match.groups())
for i, group in enumerate(groups):
if group is None:
continue
vars[f"g{i}"] = group
try:
vars[f"i{i}"] = int(group)
except ValueError:
pass
try:
vars[f"f{i}"] = float(group)
except ValueError:
pass
result = eval(f"f{self._right!r}", vars)
return PurePath(result)
return False
return None
class RuleParseError(Exception):
@ -162,18 +177,15 @@ class RuleParseError(Exception):
log.error_contd(f"{spaces}^--- {self.reason}")
T = TypeVar("T")
class Line:
def __init__(self, line: str, line_nr: int):
self._line = line
self._line_nr = line_nr
self._index = 0
def get(self) -> Optional[str]:
if self._index < len(self._line):
return self._line[self._index]
return None
@property
def line(self) -> str:
return self._line
@ -190,155 +202,196 @@ class Line:
def index(self, index: int) -> None:
self._index = index
def advance(self) -> None:
self._index += 1
@property
def rest(self) -> str:
return self.line[self.index:]
def expect(self, string: str) -> None:
for char in string:
if self.get() == char:
self.advance()
else:
raise RuleParseError(self, f"Expected {char!r}")
def peek(self, amount: int = 1) -> str:
return self.rest[:amount]
def take(self, amount: int = 1) -> str:
string = self.peek(amount)
self.index += len(string)
return string
def expect(self, string: str) -> str:
if self.peek(len(string)) == string:
return self.take(len(string))
else:
raise RuleParseError(self, f"Expected {string!r}")
def expect_with(self, string: str, value: T) -> T:
self.expect(string)
return value
def one_of(self, parsers: List[Callable[[], T]], description: str) -> T:
for parser in parsers:
index = self.index
try:
return parser()
except RuleParseError:
self.index = index
raise RuleParseError(self, description)
# RULE = LEFT SPACE '-' NAME '-' HEAD (SPACE RIGHT)?
# SPACE = ' '+
# NAME = '' | 'exact' | 'name' | 're' | 'exact-re' | 'name-re'
# HEAD = '>' | '>>'
# LEFT = STR | QUOTED_STR
# RIGHT = STR | QUOTED_STR | '!'
def parse_zero_or_more_spaces(line: Line) -> None:
while line.peek() == " ":
line.take()
def parse_one_or_more_spaces(line: Line) -> None:
line.expect(" ")
parse_zero_or_more_spaces(line)
def parse_str(line: Line) -> str:
result = []
while c := line.peek():
if c == " ":
break
else:
line.take()
result.append(c)
if result:
return "".join(result)
else:
raise RuleParseError(line, "Expected non-space character")
QUOTATION_MARKS = {'"', "'"}
def parse_string_literal(line: Line) -> str:
def parse_quoted_str(line: Line) -> str:
escaped = False
# Points to first character of string literal
start_index = line.index
quotation_mark = line.get()
quotation_mark = line.peek()
if quotation_mark not in QUOTATION_MARKS:
# This should never happen as long as this function is only called from
# parse_string.
raise RuleParseError(line, "Invalid quotation mark")
line.advance()
raise RuleParseError(line, "Expected quotation mark")
line.take()
while c := line.get():
while c := line.peek():
if escaped:
escaped = False
line.advance()
line.take()
elif c == quotation_mark:
line.advance()
line.take()
stop_index = line.index
literal = line.line[start_index:stop_index]
return ast.literal_eval(literal)
try:
return ast.literal_eval(literal)
except SyntaxError as e:
line.index = start_index
raise RuleParseError(line, str(e)) from e
elif c == "\\":
escaped = True
line.advance()
line.take()
else:
line.advance()
line.take()
raise RuleParseError(line, "Expected end of string literal")
def parse_until_space_or_eol(line: Line) -> str:
result = []
while c := line.get():
if c == " ":
break
result.append(c)
line.advance()
return "".join(result)
def parse_string(line: Line) -> Union[str, bool]:
if line.get() in QUOTATION_MARKS:
return parse_string_literal(line)
def parse_left(line: Line) -> str:
if line.peek() in QUOTATION_MARKS:
return parse_quoted_str(line)
else:
string = parse_until_space_or_eol(line)
return parse_str(line)
def parse_right(line: Line) -> Union[str, Ignore]:
c = line.peek()
if c in QUOTATION_MARKS:
return parse_quoted_str(line)
else:
string = parse_str(line)
if string == "!":
return True
return Ignore()
return string
def parse_arrow(line: Line) -> str:
line.expect("-")
name = []
while True:
c = line.get()
if not c:
raise RuleParseError(line, "Expected rest of arrow")
elif c == "-":
line.advance()
c = line.get()
if not c:
raise RuleParseError(line, "Expected rest of arrow")
elif c == ">":
line.advance()
break # End of arrow
else:
name.append("-")
continue
else:
name.append(c)
line.advance()
return "".join(name)
def parse_arrow_name(line: Line) -> str:
return line.one_of([
lambda: line.expect("exact-re"),
lambda: line.expect("exact"),
lambda: line.expect("name-re"),
lambda: line.expect("name"),
lambda: line.expect("re"),
lambda: line.expect(""),
], "Expected arrow name")
def parse_whitespace(line: Line) -> None:
line.expect(" ")
while line.get() == " ":
line.advance()
def parse_arrow_head(line: Line) -> ArrowHead:
return line.one_of([
lambda: line.expect_with(">>", ArrowHead.SEQUENCE),
lambda: line.expect_with(">", ArrowHead.NORMAL),
], "Expected arrow head")
def parse_eol(line: Line) -> None:
if line.get() is not None:
if line.peek():
raise RuleParseError(line, "Expected end of line")
def parse_rule(line: Line) -> Rule:
# Parse left side
leftindex = line.index
left = parse_string(line)
if isinstance(left, bool):
line.index = leftindex
raise RuleParseError(line, "Left side can't be '!'")
leftpath = PurePath(left)
parse_zero_or_more_spaces(line)
left_index = line.index
left = parse_left(line)
# Parse arrow
parse_whitespace(line)
arrowindex = line.index
arrowname = parse_arrow(line)
parse_one_or_more_spaces(line)
# Parse right side
if line.get():
parse_whitespace(line)
right = parse_string(line)
line.expect("-")
name = parse_arrow_name(line)
line.expect("-")
head = parse_arrow_head(line)
right_index = line.index
right: RightSide
try:
parse_zero_or_more_spaces(line)
parse_eol(line)
right = Empty()
except RuleParseError:
line.index = right_index
parse_one_or_more_spaces(line)
right = parse_right(line)
parse_eol(line)
return Rule(left, left_index, name, head, right, right_index)
def parse_transformation(line: Line) -> Transformation:
rule = parse_rule(line)
if rule.name == "":
return RenamingParentsTf(ExactTf(rule))
elif rule.name == "exact":
return ExactTf(rule)
elif rule.name == "name":
if len(PurePath(rule.left).parts) > 1:
line.index = rule.left_index
raise RuleParseError(line, "Expected name, not multiple segments")
return RenamingPartsTf(ExactTf(rule))
elif rule.name == "re":
return RenamingParentsTf(ExactReTf(rule))
elif rule.name == "exact-re":
return ExactReTf(rule)
elif rule.name == "name-re":
return RenamingPartsTf(ExactReTf(rule))
else:
right = False
rightpath: Union[PurePath, bool]
if isinstance(right, bool):
rightpath = right
else:
rightpath = PurePath(right)
parse_eol(line)
# Dispatch
if arrowname == "":
return NormalRule(leftpath, rightpath)
elif arrowname == "name":
if len(leftpath.parts) > 1:
line.index = leftindex
raise RuleParseError(line, "SOURCE must be a single name, not multiple segments")
return NameRule(ExactRule(leftpath, rightpath))
elif arrowname == "exact":
return ExactRule(leftpath, rightpath)
elif arrowname == "re":
return ReRule(left, right)
elif arrowname == "name-re":
return NameRule(ReRule(left, right))
else:
line.index = arrowindex + 1 # For nicer error message
raise RuleParseError(line, f"Invalid arrow name {arrowname!r}")
raise RuntimeError(f"Invalid arrow name {rule.name!r}")
class Transformer:
@ -347,32 +400,40 @@ class Transformer:
May throw a RuleParseException.
"""
self._rules = []
self._tfs = []
for i, line in enumerate(rules.split("\n")):
line = line.strip()
if line:
rule = parse_rule(Line(line, i))
self._rules.append((line, rule))
tf = parse_transformation(Line(line, i))
self._tfs.append((line, tf))
def transform(self, path: PurePath) -> Optional[PurePath]:
for i, (line, rule) in enumerate(self._rules):
for i, (line, tf) in enumerate(self._tfs):
log.explain(f"Testing rule {i+1}: {line}")
try:
result = rule.transform(path)
result = tf.transform(path)
except Exception as e:
log.warn(f"Error while testing rule {i+1}: {line}")
log.warn_contd(str(e))
continue
if isinstance(result, PurePath):
log.explain(f"Match found, transformed path to {fmt_path(result)}")
return result
elif result: # Exclamation mark
log.explain("Match found, path ignored")
return None
else:
if not result:
continue
log.explain("No rule matched, path is unchanged")
if isinstance(result, Ignored):
log.explain("Match found, path ignored")
return None
if tf.rule.head == ArrowHead.NORMAL:
log.explain(f"Match found, transformed path to {fmt_path(result.path)}")
path = result.path
break
elif tf.rule.head == ArrowHead.SEQUENCE:
log.explain(f"Match found, updated path to {fmt_path(result.path)}")
path = result.path
else:
raise RuntimeError(f"Invalid transform result of type {type(result)}: {result}")
log.explain(f"Final result: {fmt_path(path)}")
return path

View File

@ -91,8 +91,14 @@ def url_set_query_params(url: str, params: Dict[str, str]) -> str:
return result
def str_path(path: PurePath) -> str:
if not path.parts:
return "."
return "/".join(path.parts)
def fmt_path(path: PurePath) -> str:
return repr(str(path))
return repr(str_path(path))
def fmt_real_path(path: Path) -> str:

View File

@ -1,2 +1,2 @@
NAME = "PFERD"
VERSION = "3.0.1"
VERSION = "3.4.0"

View File

@ -17,7 +17,7 @@ Binaries for Linux, Windows and Mac can be downloaded directly from the
### With pip
Ensure you have at least Python 3.8 installed. Run the following command to
Ensure you have at least Python 3.9 installed. Run the following command to
install PFERD or upgrade it to the latest version:
```
@ -26,11 +26,19 @@ $ pip install --upgrade git+https://github.com/Garmelon/PFERD@latest
The use of [venv](https://docs.python.org/3/library/venv.html) is recommended.
### With package managers
Unofficial packages are available for:
- [AUR](https://aur.archlinux.org/packages/pferd)
- [nixpkgs](https://github.com/NixOS/nixpkgs/blob/master/pkgs/tools/misc/pferd/default.nix)
See also PFERD's [repology page](https://repology.org/project/pferd/versions).
## Basic usage
PFERD can be run directly from the command line with no config file.
Run `pferd -h` to get an overview of available commands and options.
Run `pferd <command> -h` to see which options a command has.
PFERD can be run directly from the command line with no config file. Run `pferd
-h` to get an overview of available commands and options. Run `pferd <command>
-h` to see which options a command has.
For example, you can download your personal desktop from the KIT ILIAS like
this:
@ -116,17 +124,18 @@ transform =
Online-Tests --> !
Vorlesungswerbung --> !
# Rename folders
Lehrbücher --> Vorlesung
# Note the ">>" arrow head which lets us apply further rules to files moved to "Übung"
Übungsunterlagen -->> Übung
# Move exercises to own folder. Rename them to "Blatt-XX.pdf" to make them sort properly
"Übungsunterlagen/(\d+). Übungsblatt.pdf" -re-> Blätter/Blatt-{i1:02}.pdf
"Übung/(\d+). Übungsblatt.pdf" -re-> Blätter/Blatt-{i1:02}.pdf
# Move solutions to own folder. Rename them to "Blatt-XX-Lösung.pdf" to make them sort properly
"Übungsunterlagen/(\d+). Übungsblatt.*Musterlösung.pdf" -re-> Blätter/Blatt-{i1:02}-Lösung.pdf
"Übung/(\d+). Übungsblatt.*Musterlösung.pdf" -re-> Blätter/Blatt-{i1:02}-Lösung.pdf
# The course has nested folders with the same name - flatten them
"Übungsunterlagen/(.+?)/\\1/(.*)" -re-> Übung/{g1}/{g2}
# Rename remaining folders
Übungsunterlagen --> Übung
Lehrbücher --> Vorlesung
"Übung/(.+?)/\\1" -re-> Übung/{g1}
[crawl:Bar]
type = kit-ilias-web

View File

@ -12,6 +12,6 @@ pip install --upgrade setuptools
# Installing PFERD itself
pip install --editable .
# Installing various tools
pip install --upgrade mypy flake8 autopep8 isort
pip install --upgrade pyinstaller
# Installing tools and type hints
pip install --upgrade mypy flake8 autopep8 isort pyinstaller
pip install --upgrade types-chardet types-certifi

View File

@ -4,13 +4,13 @@ version = attr: PFERD.version.VERSION
[options]
packages = find:
python_requires = >=3.8
python_requires = >=3.9
install_requires =
aiohttp>=3.7.4.post0
beautifulsoup4>=4.9.3
rich>=10.1.0
keyring>=23.0.1
certifi>=2020.12.5
aiohttp>=3.8.1
beautifulsoup4>=4.10.0
rich>=11.0.0
keyring>=23.5.0
certifi>=2021.10.8
[options.entry_points]
console_scripts =