Compare commits

...

108 Commits

Author SHA1 Message Date
bf27f4a686 TODO 2023-04-19 10:13:36 +02:00
5adfdfbd2b Switch http_crawler to requests 2023-04-19 10:12:48 +02:00
5c3942a13d Fix flake8 error 2023-04-19 10:12:48 +02:00
5c9209b12e Document path formatting functions 2023-04-19 10:12:48 +02:00
50c7778d38 Use mypy to install library stub packages 2023-04-19 10:12:48 +02:00
354a22d1e3 Add vscode settings 2023-04-19 10:12:48 +02:00
6f87c5c774 Make ipd crawler synchronous 2023-04-19 10:12:48 +02:00
1ca10571f0 Remove limiter 2023-04-19 10:12:48 +02:00
10e1a5e871 De-Async ilias crawler 2023-04-19 10:12:48 +02:00
a2ffce4702 Make local crawler synchronous 2023-04-19 10:12:48 +02:00
0294ceb7d5 Update github action versions 2023-03-22 00:10:54 +01:00
6f30c6583d Fix crawling of cards without descriptions 2023-03-21 23:52:33 +01:00
467fc526e8 Fix crawling of file/video cards 2023-03-21 23:52:24 +01:00
722d2eb393 Fix crawling of courses with preselected timeline tab 2023-03-21 23:36:47 +01:00
6d44aac278 Bump version to 3.4.3 2022-11-29 18:22:19 +01:00
55a2de6b88 Fix crawling English opencast 2022-11-29 18:13:56 +01:00
c0d6d8b229 Use url after redirect for relative links 2022-11-21 18:10:45 +01:00
635caa765d Fix typo
Thanks, burg113
2022-11-15 17:17:57 +01:00
e69b55b349 Add more unofficial package managers (#66) 2022-11-04 12:18:26 +01:00
07200bbde5 Document ilias web crawler's forums option 2022-10-31 14:12:27 +01:00
c020cccc64 Include found paths in "second path found" warning 2022-10-29 14:08:29 +02:00
259cfc20cc Bump version to 3.4.2 2022-10-26 18:26:17 +02:00
37b51a66d8 Update changelog 2022-10-26 18:22:37 +02:00
f47d2f11d8 Append trailing slash to kit-ipd links to ensure urljoin works as expected 2022-10-25 20:28:22 +02:00
1b6be6bd79 Handle content pages in cards 2022-10-24 18:37:26 +02:00
e1430e6298 Handle (and ignore) surveys 2022-10-24 18:37:26 +02:00
5fdd40204b Unwrap future meetings when ILIAS hides them behind a pagination 2022-10-24 14:33:58 +02:00
fb4631ba18 Fix ilias background login 2022-10-24 13:13:36 +02:00
d72fc2760b Handle empty forums 2022-10-24 13:12:17 +02:00
4a51aaa4f5 Fix forum crawling crashing for empty threads 2022-10-19 22:59:33 +02:00
66a5b1ba02 Bump version to 3.4.1 2022-08-17 13:24:01 +02:00
aa5a3a10bc Adjust changelog 2022-08-14 21:48:59 +02:00
d9b111cec2 Correctly nest description entries 2022-08-14 21:45:33 +02:00
345f52a1f6 Detect new login button 2022-08-14 21:41:29 +02:00
ed24366aba Add pass authenticator 2022-06-05 10:04:42 +02:00
46fb782798 Add forum crawling
This downloads all forum posts when needed and saves each thread in its
own html file, named after the thread title.
2022-05-24 23:43:53 +02:00
846c29aee1 Download page descriptions 2022-05-11 21:16:56 +02:00
a5015fe9b1 Correctly parse day-only meeting dates
I failed to recognize the correct format in the previous adjustment, so
this (hopefully) fixes it for good.
Meetings apparently don't always have a time portion.
2022-05-08 23:22:26 +02:00
616b0480f7 Simplify IPD crawler link regex 2022-05-08 18:18:05 +02:00
2f0e04ce13 Adjust changelog 2022-05-05 22:57:55 +02:00
bcc537468c Fix crawling of expanded meetings
The last meeting on every page is expanded by default.
Its content is then shown inline *and* in the meeting page itself.
We should skip the inline content.
2022-05-05 22:53:37 +02:00
694ffb4d77 Fix meeting date parsing
Apparently the new pattern "<relative time qualifier>: <date>," was
added. This patch adds support for it.
2022-05-05 22:28:30 +02:00
af2cc1169a Mention href for users of link_regex option 2022-05-05 14:36:03 +02:00
bc3fa36637 Fix IPD crawler crashing on weird HTML comments 2022-05-05 14:35:42 +02:00
afbd03f777 Fix docs 2022-05-05 14:35:42 +02:00
b8fe25c580 Add .cpp to ipd link regex 2022-05-04 14:19:26 +02:00
a241672726 Bump version to 3.4.0 2022-05-01 22:29:06 +02:00
a8f76e9be7 Use utf-8 for credential file 2022-04-29 23:15:12 +02:00
b56475450d Use utf-8 for cookies 2022-04-29 23:12:41 +02:00
aa74604d29 Use utf-8 for report 2022-04-29 23:11:27 +02:00
d2e6d91880 Make PFERD executable via python -m 2022-04-27 22:52:50 +02:00
602044ff1b Fix mypy errors and add missing await 2022-04-27 22:52:50 +02:00
31631fb409 Increase minimum python version to 3.9 2022-04-27 22:52:50 +02:00
00db348218 Update changelog 2022-04-27 22:03:52 +02:00
a709280cbf Try to detect unsupported config file encoding
The encoding detection is quite rudimentary, but should detect the
default windows encoding in many cases.
2022-04-27 22:03:47 +02:00
a99ddaa0cc Read and write config in UTF-8 2022-04-27 21:47:51 +02:00
ba3d299c05 Fix changelog 2022-04-27 21:26:24 +02:00
07a21f80a6 Link to unofficial packages 2022-04-27 21:15:33 +02:00
f17b9b68f4 Add shibboleth authentication fix to changelog 2022-04-27 14:01:40 +02:00
a2831fbea2 Fix shib authentication
Authentication failed previously if the shib session was still valid.
If Shibboleth gets a request and the session is still valid, it directly
responds without a second redirect.
2022-04-27 13:55:24 +02:00
da72863b47 Placate newer mypy 2022-04-03 13:19:08 +02:00
86e2e226dc Notify user when shibboleth presents new entitlements 2022-04-03 11:37:08 +02:00
7872fe5221 Fix tables with more columns than expected 2022-01-18 22:38:48 +01:00
86947e4874 Bump version to 3.3.1 2022-01-15 15:11:22 +01:00
4f022e2d19 Reword changelog 2022-01-15 15:06:02 +01:00
f47e7374d2 Use fixed windows path for video cache 2022-01-15 12:00:30 +01:00
57ec51e95a Fix login after shib url parser change 2022-01-14 20:17:27 +01:00
0045124a4e Bump version to 3.3.0 2022-01-09 21:09:09 +01:00
9618aae83b Add content pages to changelog 2022-01-09 18:32:58 +01:00
33453ede2d Update dependency versions in setup.py 2022-01-09 18:31:42 +01:00
e467b38d73 Only reject 1970 timestamps on windows 2022-01-09 18:23:00 +01:00
e9d2d05030 Update changelog 2022-01-09 11:48:26 +01:00
4bf0c972e6 Update types for rich 11 2022-01-09 11:48:26 +01:00
4ee919625d Add rudimentary support for content pages 2022-01-08 20:47:35 +01:00
d30f25ee97 Detect shib login page as login page
And do not assume we are logged in...
2022-01-08 20:28:45 +01:00
10d9d74528 Bail out when crawling recursive courses 2022-01-08 20:28:30 +01:00
43c5453e10 Correctly crawl files on desktop
The files on the desktop do not include a download link, so we need to
rewrite it.
2022-01-08 20:00:53 +01:00
eb4de8ae0c Ignore 1970 dates as windows crashes when calling .timestamp() 2022-01-08 18:14:43 +01:00
e32c1f000f Fix mtime for single streams 2022-01-08 18:05:48 +01:00
5f527bc697 Remove Python 3.9 Pattern typehints 2022-01-08 17:14:40 +01:00
ced8b9a2d0 Fix some accordions 2022-01-08 16:58:30 +01:00
6f3cfd4396 Fix personal desktop crawling 2022-01-08 16:58:15 +01:00
462d993fbc Fix local video path cache (hopefully) 2022-01-08 00:27:48 +01:00
a99356f2a2 Fix video stream extraction 2022-01-08 00:27:34 +01:00
eac2e34161 Fix is_logged_in for ILIAS 7 2022-01-07 23:32:31 +01:00
a82a0b19c2 Collect crawler warnings/errors and include them in the report 2021-11-07 21:48:55 +01:00
90cb6e989b Do not download single videos if cache does not exist 2021-11-06 23:21:15 +01:00
6289938d7c Do not stop crawling files when encountering a CrawlWarning 2021-11-06 12:09:51 +01:00
13b8c3d9c6 Add regex option to config and CLI parser 2021-11-02 09:30:46 +01:00
88afe64a92 Refactor IPD crawler a bit 2021-11-02 01:25:01 +00:00
6b2a657573 Fix IPD crawler for different subpages (#42)
This patch reworks the IPD crawler to support subpages which do not use
"/intern" for links and fetches the folder names from table headings.
2021-11-02 01:25:01 +00:00
d6f38a61e1 Fixed minor spelling mistakes 2021-11-02 01:54:00 +01:00
ad3f4955f7 Update changelog 2021-10-30 18:14:39 +02:00
e42ab83d32 Add support for ILIAS cards 2021-10-30 18:13:44 +02:00
f9a3f9b9f2 Handle multi-stream videos 2021-10-30 18:12:29 +02:00
ef7d5ea2d3 Allow storing crawler-specific data in reports 2021-10-30 18:09:05 +02:00
55ea304ff3 Disable interpolation of ConfigParser 2021-10-25 23:37:42 +02:00
fee12b3d9e Fix changelog 2021-10-25 17:44:12 +00:00
6673077397 Add kit-ipd crawler 2021-10-21 13:20:21 +02:00
742632ed8d Bump version to 3.2.0 2021-08-04 18:27:26 +00:00
544d45cbc5 Catch non-critical exceptions at crawler top level 2021-07-13 15:42:11 +02:00
86f79ff1f1 Update changelog 2021-07-07 15:23:58 +02:00
ee67f9f472 Sort elements by ILIAS id to ensure deterministic ordering 2021-07-06 17:45:48 +02:00
8ec3f41251 Crawl ilias booking objects as links 2021-07-06 16:15:25 +02:00
89be07d4d3 Use final crawl path in HTML parsing message 2021-07-03 17:05:48 +02:00
91200f3684 Fix nondeterministic name deduplication 2021-07-03 12:09:55 +02:00
9ffd603357 Error when using multiple segments with -name->
Previously, PFERD just silently never matched the -name-> arrow. Now, it errors
when loading the config file.
2021-07-01 11:14:50 +02:00
80eeb8fe97 Add --skip option 2021-07-01 11:02:21 +02:00
35 changed files with 1737 additions and 410 deletions

View File

@ -14,12 +14,12 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
os: [ubuntu-latest, windows-latest, macos-latest] os: [ubuntu-latest, windows-latest, macos-latest]
python: ["3.8"] python: ["3.9"]
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v3
- uses: actions/setup-python@v2 - uses: actions/setup-python@v4
with: with:
python-version: ${{ matrix.python }} python-version: ${{ matrix.python }}
@ -45,7 +45,7 @@ jobs:
run: mv dist/pferd* dist/pferd-${{ matrix.os }} run: mv dist/pferd* dist/pferd-${{ matrix.os }}
- name: Upload binary - name: Upload binary
uses: actions/upload-artifact@v2 uses: actions/upload-artifact@v3
with: with:
name: Binaries name: Binaries
path: dist/pferd-${{ matrix.os }} path: dist/pferd-${{ matrix.os }}
@ -57,7 +57,7 @@ jobs:
steps: steps:
- name: Download binaries - name: Download binaries
uses: actions/download-artifact@v2 uses: actions/download-artifact@v3
with: with:
name: Binaries name: Binaries

1
.gitignore vendored
View File

@ -2,7 +2,6 @@
/.venv/ /.venv/
/PFERD.egg-info/ /PFERD.egg-info/
__pycache__/ __pycache__/
/.vscode/
# pyinstaller # pyinstaller
/pferd.spec /pferd.spec

8
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,8 @@
{
"files.insertFinalNewline": true,
"files.trimFinalNewlines": true,
"python.formatting.provider": "autopep8",
"python.linting.enabled": true,
"python.linting.flake8Enabled": true,
"python.linting.mypyEnabled": true,
}

View File

@ -22,6 +22,109 @@ ambiguous situations.
## Unreleased ## Unreleased
### Fixed
- Crawling of courses with the timeline view as the default tab
- Crawling of file and custom opencast cards
- Crawling of button cards without descriptions
## 3.4.3 - 2022-11-29
### Added
- Missing documentation for `forums` option
### Changed
- Clear up error message shown when multiple paths are found to an element
### Fixed
- IPD crawler unnecessarily appending trailing slashes
- Crawling opencast when ILIAS is set to English
## 3.4.2 - 2022-10-26
### Added
- Recognize and crawl content pages in cards
- Recognize and ignore surveys
### Fixed
- Forum crawling crashing when a thread has no messages at all
- Forum crawling crashing when a forum has no threads at all
- Ilias login failing in some cases
- Crawling of paginated future meetings
- IPD crawler handling of URLs without trailing slash
## 3.4.1 - 2022-08-17
### Added
- Download of page descriptions
- Forum download support
- `pass` authenticator
### Changed
- Add `cpp` extension to default `link_regex` of IPD crawler
- Mention hrefs in IPD crawler's `--explain` output for users of `link_regex` option
- Simplify default IPD crawler `link_regex`
### Fixed
- IPD crawler crashes on some sites
- Meeting name normalization for yesterday, today and tomorrow
- Crawling of meeting file previews
- Login with new login button html layout
- Descriptions for courses are now placed in the correct subfolder when
downloading the whole desktop
## 3.4.0 - 2022-05-01
### Added
- Message when Shibboleth entitlements need to be manually reviewed
- Links to unofficial packages and repology in the readme
### Changed
- Increase minimum supported Python version to 3.9
- Support video listings with more columns
- Use UTF-8 when reading/writing the config file
### Fixed
- Crash during authentication when the Shibboleth session is still valid
## 3.3.1 - 2022-01-15
### Fixed
- ILIAS login
- Local video cache if `windows_paths` is enabled
## 3.3.0 - 2022-01-09
### Added
- A KIT IPD crawler
- Support for ILIAS cards
- (Rudimentary) support for content pages
- Support for multi-stream videos
- Support for ILIAS 7
### Removed
- [Interpolation](https://docs.python.org/3/library/configparser.html#interpolation-of-values) in config file
### Fixed
- Crawling of recursive courses
- Crawling files directly placed on the personal desktop
- Ignore timestamps at the unix epoch as they crash on windows
## 3.2.0 - 2021-08-04
### Added
- `--skip` command line option
- Support for ILIAS booking objects
### Changed
- Using multiple path segments on left side of `-name->` now results in an
error. This was already forbidden by the documentation but silently accepted
by PFERD.
- More consistent path printing in some `--explain` messages
### Fixed
- Nondeterministic name deduplication due to ILIAS reordering elements
- More exceptions are handled properly
## 3.1.0 - 2021-06-13 ## 3.1.0 - 2021-06-13
If your config file doesn't do weird things with transforms, it should continue If your config file doesn't do weird things with transforms, it should continue

View File

@ -4,11 +4,11 @@ A config file consists of sections. A section begins with a `[section]` header,
which is followed by a list of `key = value` pairs. Comments must be on their which is followed by a list of `key = value` pairs. Comments must be on their
own line and start with `#`. Multiline values must be indented beyond their key. own line and start with `#`. Multiline values must be indented beyond their key.
Boolean values can be `yes` or `no`. For more details and some examples on the Boolean values can be `yes` or `no`. For more details and some examples on the
format, see the [configparser documentation][1] ([basic interpolation][2] is format, see the [configparser documentation][1] ([interpolation][2] is
enabled). disabled).
[1]: <https://docs.python.org/3/library/configparser.html#supported-ini-file-structure> "Supported INI File Structure" [1]: <https://docs.python.org/3/library/configparser.html#supported-ini-file-structure> "Supported INI File Structure"
[2]: <https://docs.python.org/3/library/configparser.html#configparser.BasicInterpolation> "BasicInterpolation" [2]: <https://docs.python.org/3/library/configparser.html#interpolation-of-values> "Interpolation of values"
## The `DEFAULT` section ## The `DEFAULT` section
@ -36,7 +36,7 @@ Sections whose names start with `crawl:` are used to configure crawlers. The
rest of the section name specifies the name of the crawler. rest of the section name specifies the name of the crawler.
A crawler synchronizes a remote resource to a local directory. There are A crawler synchronizes a remote resource to a local directory. There are
different types of crawlers for different kinds of resources, e. g. ILIAS different types of crawlers for different kinds of resources, e.g. ILIAS
courses or lecture websites. courses or lecture websites.
Each crawl section represents an instance of a specific type of crawler. The Each crawl section represents an instance of a specific type of crawler. The
@ -53,7 +53,7 @@ common to all crawlers:
crawler can still be executed manually using the `--crawler` or `-C` flags. crawler can still be executed manually using the `--crawler` or `-C` flags.
(Default: `no`) (Default: `no`)
- `output_dir`: The directory the crawler synchronizes files to. A crawler will - `output_dir`: The directory the crawler synchronizes files to. A crawler will
never place any files outside of this directory. (Default: the crawler's name) never place any files outside this directory. (Default: the crawler's name)
- `redownload`: When to download a file that is already present locally. - `redownload`: When to download a file that is already present locally.
(Default: `never-smart`) (Default: `never-smart`)
- `never`: If a file is present locally, it is not downloaded again. - `never`: If a file is present locally, it is not downloaded again.
@ -136,6 +136,18 @@ crawler simulate a slower, network-based crawler.
requests. (Default: `0.0`) requests. (Default: `0.0`)
- `download_speed`: Download speed (in bytes per second) to simulate. (Optional) - `download_speed`: Download speed (in bytes per second) to simulate. (Optional)
### The `kit-ipd` crawler
This crawler crawls a KIT-IPD page by url. The root page can be crawled from
outside the KIT network so you will be informed about any new/deleted files,
but downloading files requires you to be within. Adding a show delay between
requests is likely a good idea.
- `target`: URL to a KIT-IPD page
- `link_regex`: A regex that is matched against the `href` part of links. If it
matches, the given link is downloaded as a file. This is used to extract
files from KIT-IPD pages. (Default: `^.*?[^/]+\.(pdf|zip|c|cpp|java)$`)
### The `kit-ilias-web` crawler ### The `kit-ilias-web` crawler
This crawler crawls the KIT ILIAS instance. This crawler crawls the KIT ILIAS instance.
@ -169,6 +181,7 @@ script once per day should be fine.
redirect to the actual URL. Set to a negative value to disable the automatic redirect to the actual URL. Set to a negative value to disable the automatic
redirect. (Default: `-1`) redirect. (Default: `-1`)
- `videos`: Whether to download videos. (Default: `no`) - `videos`: Whether to download videos. (Default: `no`)
- `forums`: Whether to download forum threads. (Default: `no`)
- `http_timeout`: The timeout (in seconds) for all HTTP requests. (Default: - `http_timeout`: The timeout (in seconds) for all HTTP requests. (Default:
`20.0`) `20.0`)
@ -211,6 +224,23 @@ is stored in the keyring.
- `keyring_name`: The service name PFERD uses for storing credentials. (Default: - `keyring_name`: The service name PFERD uses for storing credentials. (Default:
`PFERD`) `PFERD`)
### The `pass` authenticator
This authenticator queries the [`pass` password manager][3] for a username and
password. It tries to be mostly compatible with [browserpass][4] and
[passff][5], so see those links for an overview of the format. If PFERD fails
to load your password, you can use the `--explain` flag to see why.
- `passname`: The name of the password to use (Required)
- `username_prefixes`: A comma-separated list of username line prefixes
(Default: `login,username,user`)
- `password_prefixes`: A comma-separated list of password line prefixes
(Default: `password,pass,secret`)
[3]: <https://www.passwordstore.org/> "Pass: The Standard Unix Password Manager"
[4]: <https://github.com/browserpass/browserpass-extension#organizing-password-store> "Organizing password store"
[5]: <https://github.com/passff/passff#multi-line-format> "Multi-line format"
### The `tfa` authenticator ### The `tfa` authenticator
This authenticator prompts the user on the console for a two-factor This authenticator prompts the user on the console for a two-factor
@ -260,7 +290,7 @@ path matches `SOURCE`, it is renamed to `TARGET`.
Example: `foo/bar --> baz` Example: `foo/bar --> baz`
- Doesn't match `foo`, `a/foo/bar` or `foo/baz` - Doesn't match `foo`, `a/foo/bar` or `foo/baz`
- Converts `foo/bar` into `baz` - Converts `foo/bar` into `baz`
- Converts `foo/bar/wargl` into `bar/wargl` - Converts `foo/bar/wargl` into `baz/wargl`
Example: `foo/bar --> !` Example: `foo/bar --> !`
- Doesn't match `foo`, `a/foo/bar` or `foo/baz` - Doesn't match `foo`, `a/foo/bar` or `foo/baz`
@ -304,12 +334,12 @@ is a regular expression and `TARGET` an f-string based template. If a path
matches `SOURCE`, the output path is created using `TARGET` as template. matches `SOURCE`, the output path is created using `TARGET` as template.
`SOURCE` is automatically anchored. `SOURCE` is automatically anchored.
`TARGET` uses Python's [format string syntax][3]. The *n*-th capturing group can `TARGET` uses Python's [format string syntax][6]. The *n*-th capturing group can
be referred to as `{g<n>}` (e. g. `{g3}`). `{g0}` refers to the original path. be referred to as `{g<n>}` (e.g. `{g3}`). `{g0}` refers to the original path.
If capturing group *n*'s contents are a valid integer, the integer value is If capturing group *n*'s contents are a valid integer, the integer value is
available as `{i<n>}` (e. g. `{i3}`). If capturing group *n*'s contents are a available as `{i<n>}` (e.g. `{i3}`). If capturing group *n*'s contents are a
valid float, the float value is available as `{f<n>}` (e. g. `{f3}`). If a valid float, the float value is available as `{f<n>}` (e.g. `{f3}`). If a
capturing group is not present (e. g. when matching the string `cd` with the capturing group is not present (e.g. when matching the string `cd` with the
regex `(ab)?cd`), the corresponding variables are not defined. regex `(ab)?cd`), the corresponding variables are not defined.
Python's format string syntax has rich options for formatting its arguments. For Python's format string syntax has rich options for formatting its arguments. For
@ -325,7 +355,7 @@ Example: `f(oo+)/be?ar -re-> B{g1.upper()}H/fear`
- Converts `fooooo/bear` into `BOOOOOH/fear` - Converts `fooooo/bear` into `BOOOOOH/fear`
- Converts `foo/bar/baz` into `BOOH/fear/baz` - Converts `foo/bar/baz` into `BOOH/fear/baz`
[3]: <https://docs.python.org/3/library/string.html#format-string-syntax> "Format String Syntax" [6]: <https://docs.python.org/3/library/string.html#format-string-syntax> "Format String Syntax"
### The `-name-re->` arrow ### The `-name-re->` arrow

View File

@ -1,4 +1,5 @@
Copyright 2019-2020 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, TheChristophe, Scriptim Copyright 2019-2021 Garmelon, I-Al-Istannen, danstooamerican, pavelzw,
TheChristophe, Scriptim, thelukasprobst, Toorero
Permission is hereby granted, free of charge, to any person obtaining a copy of Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in this software and associated documentation files (the "Software"), to deal in

View File

@ -15,7 +15,7 @@ from .transformer import RuleParseError
def load_config_parser(args: argparse.Namespace) -> configparser.ConfigParser: def load_config_parser(args: argparse.Namespace) -> configparser.ConfigParser:
log.explain_topic("Loading config") log.explain_topic("Loading config")
parser = configparser.ConfigParser() parser = configparser.ConfigParser(interpolation=None)
if args.command is None: if args.command is None:
log.explain("No CLI command specified, loading config from file") log.explain("No CLI command specified, loading config from file")
@ -116,7 +116,7 @@ def main() -> None:
sys.exit() sys.exit()
try: try:
pferd = Pferd(config, args.crawler) pferd = Pferd(config, args.crawler, args.skip)
except PferdLoadError as e: except PferdLoadError as e:
log.unlock() log.unlock()
log.error(str(e)) log.error(str(e))
@ -159,3 +159,7 @@ def main() -> None:
sys.exit(1) sys.exit(1)
else: else:
pferd.print_report() pferd.print_report()
if __name__ == "__main__":
main()

View File

@ -5,6 +5,7 @@ from ..config import Config
from .authenticator import Authenticator, AuthError, AuthLoadError, AuthSection # noqa: F401 from .authenticator import Authenticator, AuthError, AuthLoadError, AuthSection # noqa: F401
from .credential_file import CredentialFileAuthenticator, CredentialFileAuthSection from .credential_file import CredentialFileAuthenticator, CredentialFileAuthSection
from .keyring import KeyringAuthenticator, KeyringAuthSection from .keyring import KeyringAuthenticator, KeyringAuthSection
from .pass_ import PassAuthenticator, PassAuthSection
from .simple import SimpleAuthenticator, SimpleAuthSection from .simple import SimpleAuthenticator, SimpleAuthSection
from .tfa import TfaAuthenticator from .tfa import TfaAuthenticator
@ -19,6 +20,8 @@ AUTHENTICATORS: Dict[str, AuthConstructor] = {
CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c), CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c),
"keyring": lambda n, s, c: "keyring": lambda n, s, c:
KeyringAuthenticator(n, KeyringAuthSection(s)), KeyringAuthenticator(n, KeyringAuthSection(s)),
"pass": lambda n, s, c:
PassAuthenticator(n, PassAuthSection(s)),
"simple": lambda n, s, c: "simple": lambda n, s, c:
SimpleAuthenticator(n, SimpleAuthSection(s)), SimpleAuthenticator(n, SimpleAuthSection(s)),
"tfa": lambda n, s, c: "tfa": lambda n, s, c:

View File

@ -20,8 +20,10 @@ class CredentialFileAuthenticator(Authenticator):
path = config.default_section.working_dir() / section.path() path = config.default_section.working_dir() / section.path()
try: try:
with open(path) as f: with open(path, encoding="utf-8") as f:
lines = list(f) lines = list(f)
except UnicodeDecodeError:
raise AuthLoadError(f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8")
except OSError as e: except OSError as e:
raise AuthLoadError(f"No credential file at {fmt_real_path(path)}") from e raise AuthLoadError(f"No credential file at {fmt_real_path(path)}") from e

98
PFERD/auth/pass_.py Normal file
View File

@ -0,0 +1,98 @@
import re
import subprocess
from typing import List, Tuple
from ..logging import log
from .authenticator import Authenticator, AuthError, AuthSection
class PassAuthSection(AuthSection):
def passname(self) -> str:
if (value := self.s.get("passname")) is None:
self.missing_value("passname")
return value
def username_prefixes(self) -> List[str]:
value = self.s.get("username_prefixes", "login,username,user")
return [prefix.lower() for prefix in value.split(",")]
def password_prefixes(self) -> List[str]:
value = self.s.get("password_prefixes", "password,pass,secret")
return [prefix.lower() for prefix in value.split(",")]
class PassAuthenticator(Authenticator):
PREFIXED_LINE_RE = r"([a-zA-Z]+):\s?(.*)" # to be used with fullmatch
def __init__(self, name: str, section: PassAuthSection) -> None:
super().__init__(name)
self._passname = section.passname()
self._username_prefixes = section.username_prefixes()
self._password_prefixes = section.password_prefixes()
async def credentials(self) -> Tuple[str, str]:
log.explain_topic("Obtaining credentials from pass")
try:
log.explain(f"Calling 'pass show {self._passname}'")
result = subprocess.check_output(["pass", "show", self._passname], text=True)
except subprocess.CalledProcessError as e:
raise AuthError(f"Failed to get password info from {self._passname}: {e}")
prefixed = {}
unprefixed = []
for line in result.strip().splitlines():
if match := re.fullmatch(self.PREFIXED_LINE_RE, line):
prefix = match.group(1).lower()
value = match.group(2)
log.explain(f"Found prefixed line {line!r} with prefix {prefix!r}, value {value!r}")
if prefix in prefixed:
raise AuthError(f"Prefix {prefix} specified multiple times")
prefixed[prefix] = value
else:
log.explain(f"Found unprefixed line {line!r}")
unprefixed.append(line)
username = None
for prefix in self._username_prefixes:
log.explain(f"Looking for username at prefix {prefix!r}")
if prefix in prefixed:
username = prefixed[prefix]
log.explain(f"Found username {username!r}")
break
password = None
for prefix in self._password_prefixes:
log.explain(f"Looking for password at prefix {prefix!r}")
if prefix in prefixed:
password = prefixed[prefix]
log.explain(f"Found password {password!r}")
break
if password is None and username is None:
log.explain("No username and password found so far")
log.explain("Using first unprefixed line as password")
log.explain("Using second unprefixed line as username")
elif password is None:
log.explain("No password found so far")
log.explain("Using first unprefixed line as password")
elif username is None:
log.explain("No username found so far")
log.explain("Using first unprefixed line as username")
if password is None:
if not unprefixed:
log.explain("Not enough unprefixed lines left")
raise AuthError("Password could not be determined")
password = unprefixed.pop(0)
log.explain(f"Found password {password!r}")
if username is None:
if not unprefixed:
log.explain("Not enough unprefixed lines left")
raise AuthError("Username could not be determined")
username = unprefixed.pop(0)
log.explain(f"Found username {username!r}")
return username, password

View File

@ -9,4 +9,5 @@
from . import command_local # noqa: F401 imported but unused from . import command_local # noqa: F401 imported but unused
from . import command_kit_ilias_web # noqa: F401 imported but unused from . import command_kit_ilias_web # noqa: F401 imported but unused
from . import command_kit_ipd # noqa: F401 imported but unused
from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused

View File

@ -62,6 +62,11 @@ GROUP.add_argument(
action=BooleanOptionalAction, action=BooleanOptionalAction,
help="crawl and download videos" help="crawl and download videos"
) )
GROUP.add_argument(
"--forums",
action=BooleanOptionalAction,
help="crawl and download forum posts"
)
GROUP.add_argument( GROUP.add_argument(
"--http-timeout", "-t", "--http-timeout", "-t",
type=float, type=float,
@ -90,6 +95,8 @@ def load(
section["link_redirect_delay"] = str(args.link_redirect_delay) section["link_redirect_delay"] = str(args.link_redirect_delay)
if args.videos is not None: if args.videos is not None:
section["videos"] = "yes" if args.videos else "no" section["videos"] = "yes" if args.videos else "no"
if args.forums is not None:
section["forums"] = "yes" if args.forums else "no"
if args.http_timeout is not None: if args.http_timeout is not None:
section["http_timeout"] = str(args.http_timeout) section["http_timeout"] = str(args.http_timeout)

View File

@ -0,0 +1,54 @@
import argparse
import configparser
from pathlib import Path
from ..logging import log
from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler
SUBPARSER = SUBPARSERS.add_parser(
"kit-ipd",
parents=[CRAWLER_PARSER],
)
GROUP = SUBPARSER.add_argument_group(
title="kit ipd crawler arguments",
description="arguments for the 'kit-ipd' crawler",
)
GROUP.add_argument(
"--link-regex",
type=str,
metavar="REGEX",
help="href-matching regex to identify downloadable files"
)
GROUP.add_argument(
"target",
type=str,
metavar="TARGET",
help="url to crawl"
)
GROUP.add_argument(
"output",
type=Path,
metavar="OUTPUT",
help="output directory"
)
def load(
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
log.explain("Creating config for command 'kit-ipd'")
parser["crawl:kit-ipd"] = {}
section = parser["crawl:kit-ipd"]
load_crawler(args, section)
section["type"] = "kit-ipd"
section["target"] = str(args.target)
section["output_dir"] = str(args.output)
if args.link_regex:
section["link_regex"] = str(args.link_regex)
SUBPARSER.set_defaults(command=load)

View File

@ -181,6 +181,14 @@ PARSER.add_argument(
help="only execute a single crawler." help="only execute a single crawler."
" Can be specified multiple times to execute multiple crawlers" " Can be specified multiple times to execute multiple crawlers"
) )
PARSER.add_argument(
"--skip", "-S",
action="append",
type=str,
metavar="NAME",
help="don't execute this particular crawler."
" Can be specified multiple times to skip multiple crawlers"
)
PARSER.add_argument( PARSER.add_argument(
"--working-dir", "--working-dir",
type=Path, type=Path,

View File

@ -120,7 +120,7 @@ class Config:
# Using config.read_file instead of config.read because config.read # Using config.read_file instead of config.read because config.read
# would just ignore a missing file and carry on. # would just ignore a missing file and carry on.
try: try:
with open(path) as f: with open(path, encoding="utf-8") as f:
parser.read_file(f, source=str(path)) parser.read_file(f, source=str(path))
except FileNotFoundError: except FileNotFoundError:
raise ConfigLoadError(path, "File does not exist") raise ConfigLoadError(path, "File does not exist")
@ -128,6 +128,8 @@ class Config:
raise ConfigLoadError(path, "That's a directory, not a file") raise ConfigLoadError(path, "That's a directory, not a file")
except PermissionError: except PermissionError:
raise ConfigLoadError(path, "Insufficient permissions") raise ConfigLoadError(path, "Insufficient permissions")
except UnicodeDecodeError:
raise ConfigLoadError(path, "File is not encoded using UTF-8")
def dump(self, path: Optional[Path] = None) -> None: def dump(self, path: Optional[Path] = None) -> None:
""" """
@ -154,12 +156,12 @@ class Config:
try: try:
# x = open for exclusive creation, failing if the file already # x = open for exclusive creation, failing if the file already
# exists # exists
with open(path, "x") as f: with open(path, "x", encoding="utf-8") as f:
self._parser.write(f) self._parser.write(f)
except FileExistsError: except FileExistsError:
print("That file already exists.") print("That file already exists.")
if asyncio.run(prompt_yes_no("Overwrite it?", default=False)): if asyncio.run(prompt_yes_no("Overwrite it?", default=False)):
with open(path, "w") as f: with open(path, "w", encoding="utf-8") as f:
self._parser.write(f) self._parser.write(f)
else: else:
raise ConfigDumpError(path, "File already exists") raise ConfigDumpError(path, "File already exists")

View File

@ -5,6 +5,7 @@ from ..auth import Authenticator
from ..config import Config from ..config import Config
from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401 from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401
from .ilias import KitIliasWebCrawler, KitIliasWebCrawlerSection from .ilias import KitIliasWebCrawler, KitIliasWebCrawlerSection
from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection
from .local_crawler import LocalCrawler, LocalCrawlerSection from .local_crawler import LocalCrawler, LocalCrawlerSection
CrawlerConstructor = Callable[[ CrawlerConstructor = Callable[[
@ -19,4 +20,6 @@ CRAWLERS: Dict[str, CrawlerConstructor] = {
LocalCrawler(n, LocalCrawlerSection(s), c), LocalCrawler(n, LocalCrawlerSection(s), c),
"kit-ilias-web": lambda n, s, c, a: "kit-ilias-web": lambda n, s, c, a:
KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a),
"kit-ipd": lambda n, s, c, a:
KitIpdCrawler(n, KitIpdCrawlerSection(s), c),
} }

View File

@ -1,14 +1,14 @@
import asyncio import asyncio
import os import os
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections.abc import Awaitable, Coroutine
from datetime import datetime from datetime import datetime
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence, Set, Tuple, TypeVar from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, TypeVar
from ..auth import Authenticator from ..auth import Authenticator
from ..config import Config, Section from ..config import Config, Section
from ..deduplicator import Deduplicator from ..deduplicator import Deduplicator
from ..limiter import Limiter
from ..logging import ProgressBar, log from ..logging import ProgressBar, log
from ..output_dir import FileSink, FileSinkToken, OnConflict, OutputDirectory, OutputDirError, Redownload from ..output_dir import FileSink, FileSinkToken, OnConflict, OutputDirectory, OutputDirError, Redownload
from ..report import MarkConflictError, MarkDuplicateError, Report from ..report import MarkConflictError, MarkDuplicateError, Report
@ -47,16 +47,18 @@ def noncritical(f: Wrapped) -> Wrapped:
try: try:
f(*args, **kwargs) f(*args, **kwargs)
except (CrawlWarning, OutputDirError, MarkDuplicateError, MarkConflictError) as e: except (CrawlWarning, OutputDirError, MarkDuplicateError, MarkConflictError) as e:
crawler.report.add_warning(str(e))
log.warn(str(e)) log.warn(str(e))
crawler.error_free = False crawler.error_free = False
except: # noqa: E722 do not use bare 'except' except Exception as e:
crawler.error_free = False crawler.error_free = False
crawler.report.add_error(str(e))
raise raise
return wrapper # type: ignore return wrapper # type: ignore
AWrapped = TypeVar("AWrapped", bound=Callable[..., Awaitable[None]]) AWrapped = TypeVar("AWrapped", bound=Callable[..., Coroutine[Any, Any, Optional[Any]]])
def anoncritical(f: AWrapped) -> AWrapped: def anoncritical(f: AWrapped) -> AWrapped:
@ -72,29 +74,32 @@ def anoncritical(f: AWrapped) -> AWrapped:
Warning: Must only be applied to member functions of the Crawler class! Warning: Must only be applied to member functions of the Crawler class!
""" """
async def wrapper(*args: Any, **kwargs: Any) -> None: async def wrapper(*args: Any, **kwargs: Any) -> Optional[Any]:
if not (args and isinstance(args[0], Crawler)): if not (args and isinstance(args[0], Crawler)):
raise RuntimeError("@anoncritical must only applied to Crawler methods") raise RuntimeError("@anoncritical must only applied to Crawler methods")
crawler = args[0] crawler = args[0]
try: try:
await f(*args, **kwargs) return await f(*args, **kwargs)
except (CrawlWarning, OutputDirError, MarkDuplicateError, MarkConflictError) as e: except (CrawlWarning, OutputDirError, MarkDuplicateError, MarkConflictError) as e:
log.warn(str(e)) log.warn(str(e))
crawler.error_free = False crawler.error_free = False
except: # noqa: E722 do not use bare 'except' crawler.report.add_warning(str(e))
except Exception as e:
crawler.error_free = False crawler.error_free = False
crawler.report.add_error(str(e))
raise raise
return None
return wrapper # type: ignore return wrapper # type: ignore
class CrawlToken(ReusableAsyncContextManager[ProgressBar]): class CrawlToken(ReusableAsyncContextManager[ProgressBar]):
def __init__(self, limiter: Limiter, path: PurePath): def __init__(self, path: PurePath):
super().__init__() super().__init__()
self._limiter = limiter
self._path = path self._path = path
@property @property
@ -103,17 +108,15 @@ class CrawlToken(ReusableAsyncContextManager[ProgressBar]):
async def _on_aenter(self) -> ProgressBar: async def _on_aenter(self) -> ProgressBar:
self._stack.callback(lambda: log.status("[bold cyan]", "Crawled", fmt_path(self._path))) self._stack.callback(lambda: log.status("[bold cyan]", "Crawled", fmt_path(self._path)))
await self._stack.enter_async_context(self._limiter.limit_crawl())
bar = self._stack.enter_context(log.crawl_bar("[bold bright_cyan]", "Crawling", fmt_path(self._path))) bar = self._stack.enter_context(log.crawl_bar("[bold bright_cyan]", "Crawling", fmt_path(self._path)))
return bar return bar
class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]): class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
def __init__(self, limiter: Limiter, fs_token: FileSinkToken, path: PurePath): def __init__(self, fs_token: FileSinkToken, path: PurePath):
super().__init__() super().__init__()
self._limiter = limiter
self._fs_token = fs_token self._fs_token = fs_token
self._path = path self._path = path
@ -122,7 +125,6 @@ class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
return self._path return self._path
async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]: async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]:
await self._stack.enter_async_context(self._limiter.limit_download())
sink = await self._stack.enter_async_context(self._fs_token) sink = await self._stack.enter_async_context(self._fs_token)
# The "Downloaded ..." message is printed in the output dir, not here # The "Downloaded ..." message is printed in the output dir, not here
bar = self._stack.enter_context(log.download_bar("[bold bright_cyan]", "Downloading", bar = self._stack.enter_context(log.download_bar("[bold bright_cyan]", "Downloading",
@ -228,12 +230,6 @@ class Crawler(ABC):
self.name = name self.name = name
self.error_free = True self.error_free = True
self._limiter = Limiter(
task_limit=section.tasks(),
download_limit=section.downloads(),
task_delay=section.task_delay(),
)
self._deduplicator = Deduplicator(section.windows_paths()) self._deduplicator = Deduplicator(section.windows_paths())
self._transformer = Transformer(section.transform()) self._transformer = Transformer(section.transform())
@ -281,7 +277,7 @@ class Crawler(ABC):
return None return None
log.explain("Answer: Yes") log.explain("Answer: Yes")
return CrawlToken(self._limiter, path) return CrawlToken(path)
async def download( async def download(
self, self,
@ -306,7 +302,7 @@ class Crawler(ABC):
return None return None
log.explain("Answer: Yes") log.explain("Answer: Yes")
return DownloadToken(self._limiter, fs_token, path) return DownloadToken(fs_token, path)
async def _cleanup(self) -> None: async def _cleanup(self) -> None:
log.explain_topic("Decision: Clean up files") log.explain_topic("Decision: Clean up files")
@ -318,6 +314,7 @@ class Crawler(ABC):
log.explain("Warnings or errors occurred during this run") log.explain("Warnings or errors occurred during this run")
log.explain("Answer: No") log.explain("Answer: No")
@anoncritical
async def run(self) -> None: async def run(self) -> None:
""" """
Start the crawling process. Call this function if you want to use a Start the crawling process. Call this function if you want to use a

View File

@ -1,12 +1,9 @@
import asyncio import asyncio
import http.cookies from http.cookiejar import LWPCookieJar
import ssl
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional from typing import Dict, List, Optional
import aiohttp import requests
import certifi
from aiohttp.client import ClientTimeout
from ..auth import Authenticator from ..auth import Authenticator
from ..config import Config from ..config import Config
@ -35,9 +32,9 @@ class HttpCrawler(Crawler):
self._authentication_id = 0 self._authentication_id = 0
self._authentication_lock = asyncio.Lock() self._authentication_lock = asyncio.Lock()
self._request_count = 0 self._http_timeout = section.http_timeout() # TODO Use or remove
self._http_timeout = section.http_timeout()
self._cookie_jar = LWPCookieJar()
self._cookie_jar_path = self._output_dir.resolve(self.COOKIE_FILE) self._cookie_jar_path = self._output_dir.resolve(self.COOKIE_FILE)
self._shared_cookie_jar_paths: Optional[List[Path]] = None self._shared_cookie_jar_paths: Optional[List[Path]] = None
self._shared_auth = shared_auth self._shared_auth = shared_auth
@ -57,7 +54,6 @@ class HttpCrawler(Crawler):
# This should reduce the amount of requests we make: If an authentication is in progress # This should reduce the amount of requests we make: If an authentication is in progress
# all future requests wait for authentication to complete. # all future requests wait for authentication to complete.
async with self._authentication_lock: async with self._authentication_lock:
self._request_count += 1
return self._authentication_id return self._authentication_id
async def authenticate(self, caller_auth_id: int) -> None: async def authenticate(self, caller_auth_id: int) -> None:
@ -106,32 +102,13 @@ class HttpCrawler(Crawler):
self._shared_cookie_jar_paths.append(self._cookie_jar_path) self._shared_cookie_jar_paths.append(self._cookie_jar_path)
def _load_cookies_from_file(self, path: Path) -> None:
jar: Any = http.cookies.SimpleCookie()
with open(path) as f:
for i, line in enumerate(f):
# Names of headers are case insensitive
if line[:11].lower() == "set-cookie:":
jar.load(line[11:])
else:
log.explain(f"Line {i} doesn't start with 'Set-Cookie:', ignoring it")
self._cookie_jar.update_cookies(jar)
def _save_cookies_to_file(self, path: Path) -> None:
jar: Any = http.cookies.SimpleCookie()
for morsel in self._cookie_jar:
jar[morsel.key] = morsel
with open(path, "w") as f:
f.write(jar.output(sep="\n"))
f.write("\n") # A trailing newline is just common courtesy
def _load_cookies(self) -> None: def _load_cookies(self) -> None:
log.explain_topic("Loading cookies") log.explain_topic("Loading cookies")
cookie_jar_path: Optional[Path] = None cookie_jar_path: Optional[Path] = None
if self._shared_cookie_jar_paths is None: if self._shared_cookie_jar_paths is None:
log.explain("Not sharing any cookies") log.explain("Not sharing cookies")
cookie_jar_path = self._cookie_jar_path cookie_jar_path = self._cookie_jar_path
else: else:
log.explain("Sharing cookies") log.explain("Sharing cookies")
@ -154,46 +131,38 @@ class HttpCrawler(Crawler):
log.explain(f"Loading cookies from {fmt_real_path(cookie_jar_path)}") log.explain(f"Loading cookies from {fmt_real_path(cookie_jar_path)}")
try: try:
self._load_cookies_from_file(cookie_jar_path) self._cookie_jar.load(filename=str(cookie_jar_path))
except Exception as e: except Exception as e:
log.explain("Failed to load cookies") log.explain(f"Failed to load cookies: {e}")
log.explain(str(e)) log.explain("Proceeding without cookies")
def _save_cookies(self) -> None: def _save_cookies(self) -> None:
log.explain_topic("Saving cookies") log.explain_topic("Saving cookies")
try: try:
log.explain(f"Saving cookies to {fmt_real_path(self._cookie_jar_path)}") log.explain(f"Saving cookies to {fmt_real_path(self._cookie_jar_path)}")
self._save_cookies_to_file(self._cookie_jar_path) self._cookie_jar.save(filename=str(self._cookie_jar_path))
except Exception as e: except Exception as e:
log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}") log.warn(f"Failed to save cookies: {e}")
log.warn(str(e))
async def run(self) -> None: async def run(self) -> None:
self._request_count = 0 self._request_count = 0
self._cookie_jar = aiohttp.CookieJar()
self._load_cookies() self._load_cookies()
async with aiohttp.ClientSession( self.session = requests.Session()
headers={"User-Agent": f"{NAME}/{VERSION}"}, self.session.headers["User-Agent"] = f"{NAME}/{VERSION}"
cookie_jar=self._cookie_jar,
connector=aiohttp.TCPConnector(ssl=ssl.create_default_context(cafile=certifi.where())), # From the request docs: "All requests code should work out of the box
timeout=ClientTimeout( # with externally provided instances of CookieJar, e.g. LWPCookieJar and
# 30 minutes. No download in the history of downloads was longer than 30 minutes. # FileCookieJar."
# This is enough to transfer a 600 MB file over a 3 Mib/s connection. # https://requests.readthedocs.io/en/latest/api/#requests.cookies.RequestsCookieJar
# Allowing an arbitrary value could be annoying for overnight batch jobs self.session.cookies = self._cookie_jar # type: ignore
total=15 * 60,
connect=self._http_timeout, with self.session:
sock_connect=self._http_timeout,
sock_read=self._http_timeout,
)
) as session:
self.session = session
try: try:
await super().run() await super().run()
finally: finally:
del self.session del self.session
log.explain_topic(f"Total amount of HTTP requests: {self._request_count}")
# They are saved in authenticate, but a final save won't hurt # They are saved in authenticate, but a final save won't hurt
self._save_cookies() self._save_cookies()

View File

@ -0,0 +1,91 @@
from bs4 import BeautifulSoup, Comment, Tag
_STYLE_TAG_CONTENT = """
.ilc_text_block_Information {
background-color: #f5f7fa;
}
div.ilc_text_block_Standard {
margin-bottom: 10px;
margin-top: 10px;
}
span.ilc_text_inline_Strong {
font-weight: bold;
}
.accordion-head {
background-color: #f5f7fa;
padding: 0.5rem 0;
}
h3 {
margin-top: 0.5rem;
margin-bottom: 1rem;
}
br.visible-break {
margin-bottom: 1rem;
}
article {
margin: 0.5rem 0;
}
body {
padding: 1em;
grid-template-columns: 1fr min(60rem, 90%) 1fr;
line-height: 1.2;
}
"""
_ARTICLE_WORTHY_CLASSES = [
"ilc_text_block_Information",
"ilc_section_Attention",
"ilc_section_Link",
]
def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup:
head = soup.new_tag("head")
soup.insert(0, head)
simplecss_link: Tag = soup.new_tag("link")
# <link rel="stylesheet" href="https://cdn.simplecss.org/simple.css">
simplecss_link["rel"] = "stylesheet"
simplecss_link["href"] = "https://cdn.simplecss.org/simple.css"
head.append(simplecss_link)
# Basic style tags for compat
style: Tag = soup.new_tag("style")
style.append(_STYLE_TAG_CONTENT)
head.append(style)
return soup
def clean(soup: BeautifulSoup) -> BeautifulSoup:
for block in soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES):
block.name = "article"
for block in soup.find_all("h3"):
block.name = "div"
for block in soup.find_all("h1"):
block.name = "h3"
for block in soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap"):
block.name = "h3"
block["class"] += ["accordion-head"]
for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"):
children = list(dummy.children)
if not children:
dummy.decompose()
if len(children) > 1:
continue
if type(children[0]) == Comment:
dummy.decompose()
for hrule_imposter in soup.find_all(class_="ilc_section_Separator"):
hrule_imposter.insert(0, soup.new_tag("hr"))
return soup

View File

@ -3,7 +3,7 @@ import re
from dataclasses import dataclass from dataclasses import dataclass
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from enum import Enum from enum import Enum
from typing import List, Optional, Union from typing import Dict, List, Optional, Union
from urllib.parse import urljoin, urlparse from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
@ -22,7 +22,9 @@ class IliasElementType(Enum):
FOLDER = "folder" FOLDER = "folder"
FORUM = "forum" FORUM = "forum"
LINK = "link" LINK = "link"
BOOKING = "booking"
MEETING = "meeting" MEETING = "meeting"
SURVEY = "survey"
VIDEO = "video" VIDEO = "video"
VIDEO_PLAYER = "video_player" VIDEO_PLAYER = "video_player"
VIDEO_FOLDER = "video_folder" VIDEO_FOLDER = "video_folder"
@ -37,6 +39,37 @@ class IliasPageElement:
mtime: Optional[datetime] = None mtime: Optional[datetime] = None
description: Optional[str] = None description: Optional[str] = None
def id(self) -> str:
regexes = [
r"eid=(?P<id>[0-9a-z\-]+)",
r"file_(?P<id>\d+)",
r"ref_id=(?P<id>\d+)",
r"target=[a-z]+_(?P<id>\d+)"
]
for regex in regexes:
if match := re.search(regex, self.url):
return match.groupdict()["id"]
# Fall back to URL
log.warn(f"Didn't find identity for {self.name} - {self.url}. Please report this.")
return self.url
@dataclass
class IliasDownloadForumData:
url: str
form_data: Dict[str, Union[str, List[str]]]
empty: bool
@dataclass
class IliasForumThread:
title: str
title_tag: Tag
content_tag: Tag
mtime: Optional[datetime]
class IliasPage: class IliasPage:
@ -59,16 +92,83 @@ class IliasPage:
if self._is_exercise_file(): if self._is_exercise_file():
log.explain("Page is an exercise, searching for elements") log.explain("Page is an exercise, searching for elements")
return self._find_exercise_entries() return self._find_exercise_entries()
if self._is_personal_desktop():
log.explain("Page is the personal desktop, searching for elements")
return self._find_personal_desktop_entries()
if self._is_content_page():
log.explain("Page is a content page, searching for elements")
return self._find_copa_entries()
log.explain("Page is a normal folder, searching for elements") log.explain("Page is a normal folder, searching for elements")
return self._find_normal_entries() return self._find_normal_entries()
def get_description(self) -> Optional[BeautifulSoup]:
def is_interesting_class(name: str) -> bool:
return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"]
paragraphs: List[Tag] = self._soup.findAll(class_=is_interesting_class)
if not paragraphs:
return None
# Extract bits and pieces into a string and parse it again.
# This ensures we don't miss anything and weird structures are resolved
# somewhat gracefully.
raw_html = ""
for p in paragraphs:
if p.find_parent(class_=is_interesting_class):
continue
# Ignore special listings (like folder groupings)
if "ilc_section_Special" in p["class"]:
continue
raw_html += str(p) + "\n"
raw_html = f"<body>\n{raw_html}\n</body>"
return BeautifulSoup(raw_html, "html.parser")
def get_download_forum_data(self) -> Optional[IliasDownloadForumData]:
form = self._soup.find("form", attrs={"action": lambda x: x and "fallbackCmd=showThreads" in x})
if not form:
return None
post_url = self._abs_url_from_relative(form["action"])
thread_ids = [f["value"] for f in form.find_all(attrs={"name": "thread_ids[]"})]
form_data: Dict[str, Union[str, List[str]]] = {
"thread_ids[]": thread_ids,
"selected_cmd2": "html",
"select_cmd2": "Ausführen",
"selected_cmd": "",
}
return IliasDownloadForumData(url=post_url, form_data=form_data, empty=len(thread_ids) == 0)
def get_next_stage_element(self) -> Optional[IliasPageElement]: def get_next_stage_element(self) -> Optional[IliasPageElement]:
if self._is_forum_page():
if "trows=800" in self._page_url:
return None
log.explain("Requesting *all* forum threads")
return self._get_show_max_forum_entries_per_page_url()
if self._is_ilias_opencast_embedding(): if self._is_ilias_opencast_embedding():
log.explain("Unwrapping opencast embedding")
return self.get_child_elements()[0] return self.get_child_elements()[0]
if self._page_type == IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED: if self._page_type == IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED:
log.explain("Unwrapping video pagination")
return self._find_video_entries_paginated()[0] return self._find_video_entries_paginated()[0]
if self._contains_collapsed_future_meetings():
log.explain("Requesting *all* future meetings")
return self._uncollapse_future_meetings_url()
if not self._is_content_tab_selected():
return self._select_content_page_url()
return None return None
def _is_forum_page(self) -> bool:
read_more_btn = self._soup.find(
"button",
attrs={"onclick": lambda x: x and "cmdClass=ilobjforumgui&cmd=markAllRead" in x}
)
return read_more_btn is not None
def _is_video_player(self) -> bool: def _is_video_player(self) -> bool:
return "paella_config_file" in str(self._soup) return "paella_config_file" in str(self._soup)
@ -103,13 +203,53 @@ class IliasPage:
return False return False
def _is_personal_desktop(self) -> bool:
return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x})
def _is_content_page(self) -> bool:
link = self._soup.find(id="current_perma_link")
if not link:
return False
return "target=copa_" in link.get("value")
def _contains_collapsed_future_meetings(self) -> bool:
return self._uncollapse_future_meetings_url() is not None
def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]:
element = self._soup.find("a", attrs={"href": lambda x: x and "crs_next_sess=1" in x})
if not element:
return None
link = self._abs_url_from_link(element)
return IliasPageElement(IliasElementType.FOLDER, link, "show all meetings")
def _is_content_tab_selected(self) -> bool:
return self._select_content_page_url() is None
def _select_content_page_url(self) -> Optional[IliasPageElement]:
tab = self._soup.find(
id="tab_view_content",
attrs={"class": lambda x: x is not None and "active" not in x}
)
# Already selected (or not found)
if not tab:
return None
link = tab.find("a")
if link:
link = self._abs_url_from_link(link)
return IliasPageElement(IliasElementType.FOLDER, link, "select content page")
_unexpected_html_warning()
log.warn_contd(f"Could not find content tab URL on {self._page_url!r}.")
log.warn_contd("PFERD might not find content on the course's main page.")
return None
def _player_to_video(self) -> List[IliasPageElement]: def _player_to_video(self) -> List[IliasPageElement]:
# Fetch the actual video page. This is a small wrapper page initializing a javscript # Fetch the actual video page. This is a small wrapper page initializing a javscript
# player. Sadly we can not execute that JS. The actual video stream url is nowhere # player. Sadly we can not execute that JS. The actual video stream url is nowhere
# on the page, but defined in a JS object inside a script tag, passed to the player # on the page, but defined in a JS object inside a script tag, passed to the player
# library. # library.
# We do the impossible and RegEx the stream JSON object out of the page's HTML source # We do the impossible and RegEx the stream JSON object out of the page's HTML source
regex: re.Pattern[str] = re.compile( regex = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
) )
json_match = regex.search(str(self._soup)) json_match = regex.search(str(self._soup))
@ -121,10 +261,77 @@ class IliasPage:
# parse it # parse it
json_object = json.loads(json_str) json_object = json.loads(json_str)
# and fetch the video url! streams = [stream for stream in json_object["streams"]]
video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"]
# and just fetch the lone video url!
if len(streams) == 1:
video_url = streams[0]["sources"]["mp4"][0]["src"]
return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)] return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)]
log.explain(f"Found multiple videos for stream at {self._source_name}")
items = []
for stream in sorted(streams, key=lambda stream: stream["content"]):
full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4"
video_url = stream["sources"]["mp4"][0]["src"]
items.append(IliasPageElement(IliasElementType.VIDEO, video_url, full_name))
return items
def _get_show_max_forum_entries_per_page_url(self) -> Optional[IliasPageElement]:
correct_link = self._soup.find(
"a",
attrs={"href": lambda x: x and "trows=800" in x and "cmd=showThreads" in x}
)
if not correct_link:
return None
link = self._abs_url_from_link(correct_link)
return IliasPageElement(IliasElementType.FORUM, link, "show all forum threads")
def _find_personal_desktop_entries(self) -> List[IliasPageElement]:
items: List[IliasPageElement] = []
titles: List[Tag] = self._soup.select(".il-item-title")
for title in titles:
link = title.find("a")
name = _sanitize_path_name(link.text.strip())
url = self._abs_url_from_link(link)
type = self._find_type_from_link(name, link, url)
if not type:
_unexpected_html_warning()
log.warn_contd(f"Could not extract type for {link}")
continue
log.explain(f"Found {name!r}")
if type == IliasElementType.FILE and "_download" not in url:
url = re.sub(r"(target=file_\d+)", r"\1_download", url)
log.explain("Rewired file URL to include download part")
items.append(IliasPageElement(type, url, name))
return items
def _find_copa_entries(self) -> List[IliasPageElement]:
items: List[IliasPageElement] = []
links: List[Tag] = self._soup.findAll(class_="ilc_flist_a_FileListItemLink")
for link in links:
url = self._abs_url_from_link(link)
name = _sanitize_path_name(link.getText().strip().replace("\t", ""))
if "file_id" not in url:
_unexpected_html_warning()
log.warn_contd(f"Found unknown content page item {name!r} with url {url!r}")
continue
items.append(IliasPageElement(IliasElementType.FILE, url, name))
return items
def _find_video_entries(self) -> List[IliasPageElement]: def _find_video_entries(self) -> List[IliasPageElement]:
# ILIAS has three stages for video pages # ILIAS has three stages for video pages
# 1. The initial dummy page without any videos. This page contains the link to the listing # 1. The initial dummy page without any videos. This page contains the link to the listing
@ -182,7 +389,7 @@ class IliasPage:
""" """
# Video start links are marked with an "Abspielen" link # Video start links are marked with an "Abspielen" link
video_links: List[Tag] = self._soup.findAll( video_links: List[Tag] = self._soup.findAll(
name="a", text=re.compile(r"\s*Abspielen\s*") name="a", text=re.compile(r"\s*(Abspielen|Play)\s*")
) )
results: List[IliasPageElement] = [] results: List[IliasPageElement] = []
@ -194,11 +401,22 @@ class IliasPage:
def _listed_video_to_element(self, link: Tag) -> IliasPageElement: def _listed_video_to_element(self, link: Tag) -> IliasPageElement:
# The link is part of a table with multiple columns, describing metadata. # The link is part of a table with multiple columns, describing metadata.
# 6th child (1 indexed) is the modification time string # 6th or 7th child (1 indexed) is the modification time string. Try to find it
# by parsing backwards from the end and finding something that looks like a date
modification_time = None
row: Tag = link.parent.parent.parent
column_count = len(row.select("td.std"))
for index in range(column_count, 0, -1):
modification_string = link.parent.parent.parent.select_one( modification_string = link.parent.parent.parent.select_one(
"td.std:nth-child(6)" f"td.std:nth-child({index})"
).getText().strip() ).getText().strip()
if re.search(r"\d+\.\d+.\d+ - \d+:\d+", modification_string):
modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M") modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M")
break
if modification_time is None:
log.warn(f"Could not determine upload time for {link}")
modification_time = datetime.now()
title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip() title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip()
title += ".mp4" title += ".mp4"
@ -331,6 +549,12 @@ class IliasPage:
element_type = self._find_type_from_link(element_name, link, abs_url) element_type = self._find_type_from_link(element_name, link, abs_url)
description = self._find_link_description(link) description = self._find_link_description(link)
# The last meeting on every page is expanded by default.
# Its content is then shown inline *and* in the meeting page itself.
# We should skip the inline content.
if element_type != IliasElementType.MEETING and self._is_in_expanded_meeting(link):
continue
if not element_type: if not element_type:
continue continue
if element_type == IliasElementType.MEETING: if element_type == IliasElementType.MEETING:
@ -344,8 +568,30 @@ class IliasPage:
log.explain(f"Found {element_name!r}") log.explain(f"Found {element_name!r}")
result.append(IliasPageElement(element_type, abs_url, element_name, description=description)) result.append(IliasPageElement(element_type, abs_url, element_name, description=description))
result += self._find_cards()
return result return result
def _is_in_expanded_meeting(self, tag: Tag) -> bool:
"""
Returns whether a file is part of an expanded meeting.
Has false positives for meetings themselves as their title is also "in the expanded meeting content".
It is in the same general div and this whole thing is guesswork.
Therefore, you should check for meetings before passing them in this function.
"""
parents: List[Tag] = list(tag.parents)
for parent in parents:
if not parent.get("class"):
continue
# We should not crawl files under meetings
if "ilContainerListItemContentCB" in parent.get("class"):
link: Tag = parent.parent.find("a")
type = IliasPage._find_type_from_folder_like(link, self._page_url)
return type == IliasElementType.MEETING
return False
def _find_upwards_folder_hierarchy(self, tag: Tag) -> List[str]: def _find_upwards_folder_hierarchy(self, tag: Tag) -> List[str]:
""" """
Interprets accordions and expandable blocks as virtual folders and returns them Interprets accordions and expandable blocks as virtual folders and returns them
@ -371,7 +617,10 @@ class IliasPage:
continue continue
prev: Tag = parent.findPreviousSibling("div") prev: Tag = parent.findPreviousSibling("div")
if "ilContainerBlockHeader" in prev.get("class"): if "ilContainerBlockHeader" in prev.get("class"):
if prev.find("h3"):
found_titles.append(prev.find("h3").getText().strip()) found_titles.append(prev.find("h3").getText().strip())
else:
found_titles.append(prev.find("h2").getText().strip())
# And this for real accordions # And this for real accordions
if "il_VAccordionContentDef" in parent.get("class"): if "il_VAccordionContentDef" in parent.get("class"):
@ -426,6 +675,100 @@ class IliasPage:
log.explain(f"Found file {full_path!r}") log.explain(f"Found file {full_path!r}")
return IliasPageElement(IliasElementType.FILE, url, full_path, modification_date) return IliasPageElement(IliasElementType.FILE, url, full_path, modification_date)
def _find_cards(self) -> List[IliasPageElement]:
result: List[IliasPageElement] = []
card_titles: List[Tag] = self._soup.select(".card-title a")
for title in card_titles:
url = self._abs_url_from_link(title)
name = _sanitize_path_name(title.getText().strip())
type = self._find_type_from_card(title)
if not type:
_unexpected_html_warning()
log.warn_contd(f"Could not extract type for {title}")
continue
result.append(IliasPageElement(type, url, name))
card_button_tiles: List[Tag] = self._soup.select(".card-title button")
for button in card_button_tiles:
regex = re.compile(button["id"] + r".*window.open\(['\"](.+?)['\"]")
res = regex.search(str(self._soup))
if not res:
_unexpected_html_warning()
log.warn_contd(f"Could not find click handler target for {button}")
continue
url = self._abs_url_from_relative(res.group(1))
name = _sanitize_path_name(button.getText().strip())
type = self._find_type_from_card(button)
caption_parent = button.findParent(
"div",
attrs={"class": lambda x: x and "caption" in x},
)
caption_container = caption_parent.find_next_sibling("div")
if caption_container:
description = caption_container.getText().strip()
else:
description = None
if not type:
_unexpected_html_warning()
log.warn_contd(f"Could not extract type for {button}")
continue
result.append(IliasPageElement(type, url, name, description=description))
return result
def _find_type_from_card(self, card_title: Tag) -> Optional[IliasElementType]:
def is_card_root(element: Tag) -> bool:
return "il-card" in element["class"] and "thumbnail" in element["class"]
card_root: Optional[Tag] = None
# We look for the card root
for parent in card_title.parents:
if is_card_root(parent):
card_root = parent
break
if card_root is None:
_unexpected_html_warning()
log.warn_contd(f"Tried to figure out element type, but did not find an icon for {card_title}")
return None
icon: Tag = card_root.select_one(".il-card-repository-head .icon")
if "opencast" in icon["class"] or "xoct" in icon["class"]:
return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED
if "exc" in icon["class"]:
return IliasElementType.EXERCISE
if "webr" in icon["class"]:
return IliasElementType.LINK
if "book" in icon["class"]:
return IliasElementType.BOOKING
if "frm" in icon["class"]:
return IliasElementType.FORUM
if "sess" in icon["class"]:
return IliasElementType.MEETING
if "tst" in icon["class"]:
return IliasElementType.TEST
if "fold" in icon["class"]:
return IliasElementType.FOLDER
if "copa" in icon["class"]:
return IliasElementType.FOLDER
if "svy" in icon["class"]:
return IliasElementType.SURVEY
if "file" in icon["class"]:
return IliasElementType.FILE
_unexpected_html_warning()
log.warn_contd(f"Could not extract type from {icon} for card title {card_title}")
return None
@staticmethod @staticmethod
def _find_type_from_link( def _find_type_from_link(
element_name: str, element_name: str,
@ -441,9 +784,30 @@ class IliasPage:
if "target=file_" in parsed_url.query: if "target=file_" in parsed_url.query:
return IliasElementType.FILE return IliasElementType.FILE
if "target=grp_" in parsed_url.query:
return IliasElementType.FOLDER
if "target=crs_" in parsed_url.query:
return IliasElementType.FOLDER
if "baseClass=ilExerciseHandlerGUI" in parsed_url.query:
return IliasElementType.EXERCISE
if "baseClass=ilLinkResourceHandlerGUI" in parsed_url.query and "calldirectlink" in parsed_url.query:
return IliasElementType.LINK
if "cmd=showThreads" in parsed_url.query or "target=frm_" in parsed_url.query:
return IliasElementType.FORUM
if "cmdClass=ilobjtestgui" in parsed_url.query:
return IliasElementType.TEST
# Booking and Meeting can not be detected based on the link. They do have a ref_id though, so
# try to guess it from the image.
# Everything with a ref_id can *probably* be opened to reveal nested things # Everything with a ref_id can *probably* be opened to reveal nested things
# video groups, directories, exercises, etc # video groups, directories, exercises, etc
if "ref_id=" in parsed_url.query: if "ref_id=" in parsed_url.query or "goto.php" in parsed_url.path:
return IliasPage._find_type_from_folder_like(link_element, url) return IliasPage._find_type_from_folder_like(link_element, url)
_unexpected_html_warning() _unexpected_html_warning()
@ -464,7 +828,7 @@ class IliasPage:
# We look for the outer div of our inner link, to find information around it # We look for the outer div of our inner link, to find information around it
# (mostly the icon) # (mostly the icon)
for parent in link_element.parents: for parent in link_element.parents:
if "ilContainerListItemOuter" in parent["class"]: if "ilContainerListItemOuter" in parent["class"] or "il-std-item" in parent["class"]:
found_parent = parent found_parent = parent
break break
@ -476,6 +840,13 @@ class IliasPage:
# Find the small descriptive icon to figure out the type # Find the small descriptive icon to figure out the type
img_tag: Optional[Tag] = found_parent.select_one("img.ilListItemIcon") img_tag: Optional[Tag] = found_parent.select_one("img.ilListItemIcon")
if img_tag is None:
img_tag = found_parent.select_one("img.icon")
if img_tag is None and found_parent.find("a", attrs={"href": lambda x: x and "crs_next_sess=" in x}):
log.explain("Found session expansion button, skipping it as it has no content")
return None
if img_tag is None: if img_tag is None:
_unexpected_html_warning() _unexpected_html_warning()
log.warn_contd(f"Tried to figure out element type, but did not find an image for {url}") log.warn_contd(f"Tried to figure out element type, but did not find an image for {url}")
@ -490,6 +861,9 @@ class IliasPage:
if str(img_tag["src"]).endswith("icon_webr.svg"): if str(img_tag["src"]).endswith("icon_webr.svg"):
return IliasElementType.LINK return IliasElementType.LINK
if str(img_tag["src"]).endswith("icon_book.svg"):
return IliasElementType.BOOKING
if str(img_tag["src"]).endswith("frm.svg"): if str(img_tag["src"]).endswith("frm.svg"):
return IliasElementType.FORUM return IliasElementType.FORUM
@ -507,23 +881,38 @@ class IliasPage:
Normalizes meeting names, which have a relative time as their first part, Normalizes meeting names, which have a relative time as their first part,
to their date in ISO format. to their date in ISO format.
""" """
date_portion_str = meeting_name.split(" - ")[0]
# This checks whether we can reach a `:` without passing a `-`
if re.search(r"^[^-]+: ", meeting_name):
# Meeting name only contains date: "05. Jan 2000:"
split_delimiter = ":"
else:
# Meeting name contains date and start/end times: "05. Jan 2000, 16:00 - 17:30:"
split_delimiter = ", "
# We have a meeting day without time
date_portion_str = meeting_name.split(split_delimiter)[0]
date_portion = demangle_date(date_portion_str) date_portion = demangle_date(date_portion_str)
# We failed to parse the date, bail out
if not date_portion: if not date_portion:
return meeting_name return meeting_name
rest_of_name = meeting_name # Replace the first section with the absolute date
if rest_of_name.startswith(date_portion_str): rest_of_name = split_delimiter.join(meeting_name.split(split_delimiter)[1:])
rest_of_name = rest_of_name[len(date_portion_str):] return datetime.strftime(date_portion, "%Y-%m-%d") + split_delimiter + rest_of_name
return datetime.strftime(date_portion, "%Y-%m-%d, %H:%M") + rest_of_name
def _abs_url_from_link(self, link_tag: Tag) -> str: def _abs_url_from_link(self, link_tag: Tag) -> str:
""" """
Create an absolute url from an <a> tag. Create an absolute url from an <a> tag.
""" """
return urljoin(self._page_url, link_tag.get("href")) return self._abs_url_from_relative(link_tag.get("href"))
def _abs_url_from_relative(self, relative_url: str) -> str:
"""
Create an absolute url from a relative URL.
"""
return urljoin(self._page_url, relative_url)
def _unexpected_html_warning() -> None: def _unexpected_html_warning() -> None:
@ -536,35 +925,47 @@ english_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep',
def demangle_date(date_str: str, fail_silently: bool = False) -> Optional[datetime]: def demangle_date(date_str: str, fail_silently: bool = False) -> Optional[datetime]:
""" """
Demangle a given date in one of the following formats: Demangle a given date in one of the following formats (hour/minute part is optional):
"Gestern, HH:MM" "Gestern, HH:MM"
"Heute, HH:MM" "Heute, HH:MM"
"Morgen, HH:MM" "Morgen, HH:MM"
"dd. mon yyyy, HH:MM "dd. mon yyyy, HH:MM
""" """
try: try:
# Normalize whitespace because users
date_str = re.sub(r"\s+", " ", date_str) date_str = re.sub(r"\s+", " ", date_str)
date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I) date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I)
date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I) date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I)
date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I) date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I)
date_str = date_str.strip()
for german, english in zip(german_months, english_months): for german, english in zip(german_months, english_months):
date_str = date_str.replace(german, english) date_str = date_str.replace(german, english)
# Remove trailing dots for abbreviations, e.g. "20. Apr. 2020" -> "20. Apr 2020" # Remove trailing dots for abbreviations, e.g. "20. Apr. 2020" -> "20. Apr 2020"
date_str = date_str.replace(english + ".", english) date_str = date_str.replace(english + ".", english)
# We now have a nice english String in the format: "dd. mmm yyyy, hh:mm" # We now have a nice english String in the format: "dd. mmm yyyy, hh:mm" or "dd. mmm yyyy"
# Check if we have a time as well
if ", " in date_str:
day_part, time_part = date_str.split(",") day_part, time_part = date_str.split(",")
else:
day_part = date_str.split(",")[0]
time_part = None
day_str, month_str, year_str = day_part.split(" ") day_str, month_str, year_str = day_part.split(" ")
day = int(day_str.strip().replace(".", "")) day = int(day_str.strip().replace(".", ""))
month = english_months.index(month_str.strip()) + 1 month = english_months.index(month_str.strip()) + 1
year = int(year_str.strip()) year = int(year_str.strip())
if time_part:
hour_str, minute_str = time_part.split(":") hour_str, minute_str = time_part.split(":")
hour = int(hour_str) hour = int(hour_str)
minute = int(minute_str) minute = int(minute_str)
return datetime(year, month, day, hour, minute) return datetime(year, month, day, hour, minute)
return datetime(year, month, day)
except Exception: except Exception:
if not fail_silently: if not fail_silently:
log.warn(f"Date parsing failed for {date_str!r}") log.warn(f"Date parsing failed for {date_str!r}")
@ -586,3 +987,45 @@ def _tomorrow() -> date:
def _sanitize_path_name(name: str) -> str: def _sanitize_path_name(name: str) -> str:
return name.replace("/", "-").replace("\\", "-").strip() return name.replace("/", "-").replace("\\", "-").strip()
def parse_ilias_forum_export(forum_export: BeautifulSoup) -> List[IliasForumThread]:
elements = []
for p in forum_export.select("body > p"):
title_tag = p
content_tag = p.find_next_sibling("ul")
if not content_tag:
# ILIAS allows users to delete the initial post while keeping the thread open
# This produces empty threads without *any* content.
# I am not sure why you would want this, but ILIAS makes it easy to do.
continue
title = p.find("b").text
if ":" in title:
title = title[title.find(":") + 1:]
title = title.strip()
mtime = _guess_timestamp_from_forum_post_content(content_tag)
elements.append(IliasForumThread(title, title_tag, content_tag, mtime))
return elements
def _guess_timestamp_from_forum_post_content(content: Tag) -> Optional[datetime]:
posts: Optional[Tag] = content.select(".ilFrmPostHeader > span.small")
if not posts:
return None
newest_date: Optional[datetime] = None
for post in posts:
text = post.text.strip()
text = text[text.rfind("|") + 1:]
date = demangle_date(text, fail_silently=True)
if not date:
continue
if not newest_date or newest_date < date:
newest_date = date
return newest_date

View File

@ -1,9 +1,11 @@
import asyncio import asyncio
import re import re
from collections.abc import Awaitable, Coroutine
from pathlib import PurePath from pathlib import PurePath
from typing import Any, Awaitable, Callable, Dict, List, Optional, Set, TypeVar, Union from typing import Any, Callable, Dict, List, Optional, Set, Union, cast
import aiohttp import aiohttp
import yarl
from aiohttp import hdrs from aiohttp import hdrs
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
@ -12,13 +14,21 @@ from ...config import Config
from ...logging import ProgressBar, log from ...logging import ProgressBar, log
from ...output_dir import FileSink, Redownload from ...output_dir import FileSink, Redownload
from ...utils import fmt_path, soupify, url_set_query_param from ...utils import fmt_path, soupify, url_set_query_param
from ..crawler import CrawlError, CrawlWarning, anoncritical from ..crawler import AWrapped, CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical
from ..http_crawler import HttpCrawler, HttpCrawlerSection from ..http_crawler import HttpCrawler, HttpCrawlerSection
from .file_templates import Links from .file_templates import Links
from .kit_ilias_html import IliasElementType, IliasPage, IliasPageElement from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasPage, IliasPageElement,
_sanitize_path_name, parse_ilias_forum_export)
TargetType = Union[str, int] TargetType = Union[str, int]
_ILIAS_URL = "https://ilias.studium.kit.edu"
class KitShibbolethBackgroundLoginSuccessful():
pass
class KitIliasWebCrawlerSection(HttpCrawlerSection): class KitIliasWebCrawlerSection(HttpCrawlerSection):
def target(self) -> TargetType: def target(self) -> TargetType:
@ -32,7 +42,7 @@ class KitIliasWebCrawlerSection(HttpCrawlerSection):
if target == "desktop": if target == "desktop":
# Full personal desktop # Full personal desktop
return target return target
if target.startswith("https://ilias.studium.kit.edu"): if target.startswith(_ILIAS_URL):
# ILIAS URL # ILIAS URL
return target return target
@ -64,6 +74,9 @@ class KitIliasWebCrawlerSection(HttpCrawlerSection):
def videos(self) -> bool: def videos(self) -> bool:
return self.s.getboolean("videos", fallback=False) return self.s.getboolean("videos", fallback=False)
def forums(self) -> bool:
return self.s.getboolean("forums", fallback=False)
_DIRECTORY_PAGES: Set[IliasElementType] = set([ _DIRECTORY_PAGES: Set[IliasElementType] = set([
IliasElementType.EXERCISE, IliasElementType.EXERCISE,
@ -81,17 +94,14 @@ _VIDEO_ELEMENTS: Set[IliasElementType] = set([
IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED, IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED,
]) ])
AWrapped = TypeVar("AWrapped", bound=Callable[..., Awaitable[None]])
def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]:
def _iorepeat(attempts: int, name: str) -> Callable[[AWrapped], AWrapped]:
def decorator(f: AWrapped) -> AWrapped: def decorator(f: AWrapped) -> AWrapped:
async def wrapper(*args: Any, **kwargs: Any) -> None: async def wrapper(*args: Any, **kwargs: Any) -> Optional[Any]:
last_exception: Optional[BaseException] = None last_exception: Optional[BaseException] = None
for round in range(attempts): for round in range(attempts):
try: try:
await f(*args, **kwargs) return await f(*args, **kwargs)
return
except aiohttp.ContentTypeError: # invalid content type except aiohttp.ContentTypeError: # invalid content type
raise CrawlWarning("ILIAS returned an invalid content type") raise CrawlWarning("ILIAS returned an invalid content type")
except aiohttp.TooManyRedirects: except aiohttp.TooManyRedirects:
@ -106,6 +116,9 @@ def _iorepeat(attempts: int, name: str) -> Callable[[AWrapped], AWrapped]:
if last_exception: if last_exception:
message = f"Error in I/O Operation: {last_exception}" message = f"Error in I/O Operation: {last_exception}"
if failure_is_error:
raise CrawlError(message) from last_exception
else:
raise CrawlWarning(message) from last_exception raise CrawlWarning(message) from last_exception
raise CrawlError("Impossible return in ilias _iorepeat") raise CrawlError("Impossible return in ilias _iorepeat")
@ -113,13 +126,6 @@ def _iorepeat(attempts: int, name: str) -> Callable[[AWrapped], AWrapped]:
return decorator return decorator
def _wrap_io_in_warning(name: str) -> Callable[[AWrapped], AWrapped]:
"""
Wraps any I/O exception in a CrawlWarning.
"""
return _iorepeat(1, name)
# Crawler control flow: # Crawler control flow:
# #
# crawl_desktop -+ # crawl_desktop -+
@ -174,12 +180,14 @@ instance's greatest bottleneck.
section.tfa_auth(authenticators), section.tfa_auth(authenticators),
) )
self._base_url = "https://ilias.studium.kit.edu" self._base_url = _ILIAS_URL
self._target = section.target() self._target = section.target()
self._link_file_redirect_delay = section.link_redirect_delay() self._link_file_redirect_delay = section.link_redirect_delay()
self._links = section.links() self._links = section.links()
self._videos = section.videos() self._videos = section.videos()
self._forums = section.forums()
self._visited_urls: Dict[str, PurePath] = dict()
async def _run(self) -> None: async def _run(self) -> None:
if isinstance(self._target, int): if isinstance(self._target, int):
@ -201,7 +209,9 @@ instance's greatest bottleneck.
await self._crawl_url(root_url, expected_id=course_id) await self._crawl_url(root_url, expected_id=course_id)
async def _crawl_desktop(self) -> None: async def _crawl_desktop(self) -> None:
await self._crawl_url(self._base_url) appendix = r"ILIAS\PersonalDesktop\PDMainBarProvider|mm_pd_sel_items"
appendix = appendix.encode("ASCII").hex()
await self._crawl_url(self._base_url + "/gs_content.php?item=" + appendix)
async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None: async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None:
maybe_cl = await self.crawl(PurePath(".")) maybe_cl = await self.crawl(PurePath("."))
@ -209,51 +219,33 @@ instance's greatest bottleneck.
return return
cl = maybe_cl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608 cl = maybe_cl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
elements: List[IliasPageElement] = [] def ensure_is_valid_course_id(parent: Optional[IliasPageElement], soup: BeautifulSoup) -> None:
if parent is None and expected_id is not None:
@_iorepeat(3, "crawling url")
async def gather_elements() -> None:
elements.clear()
async with cl:
soup = await self._get_page(url)
if expected_id is not None:
perma_link_element: Tag = soup.find(id="current_perma_link") perma_link_element: Tag = soup.find(id="current_perma_link")
if not perma_link_element or "crs_" not in perma_link_element.get("value"): if not perma_link_element or "crs_" not in perma_link_element.get("value"):
raise CrawlError("Invalid course id? Didn't find anything looking like a course") raise CrawlError("Invalid course id? Didn't find anything looking like a course")
# Duplicated code, but the root page is special - we want to avoid fetching it twice! await self._crawl_ilias_page(url, None, cl, ensure_is_valid_course_id)
log.explain_topic("Parsing root HTML page")
log.explain(f"URL: {url}")
page = IliasPage(soup, url, None)
elements.extend(page.get_child_elements())
# Fill up our task list with the found elements @anoncritical
await gather_elements() async def _crawl_ilias_page(
tasks = [self._handle_ilias_element(PurePath("."), element) for element in elements] self,
url: str,
# And execute them parent: Optional[IliasPageElement],
await self.gather(tasks) cl: CrawlToken,
next_stage_hook: Callable[[Optional[IliasPageElement], BeautifulSoup], None] = lambda a, b: None
async def _handle_ilias_page(self, url: str, parent: IliasPageElement, path: PurePath) -> None: ) -> None:
maybe_cl = await self.crawl(path)
if not maybe_cl:
return
cl = maybe_cl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
elements: List[IliasPageElement] = []
@_iorepeat(3, "crawling folder")
async def gather_elements() -> None:
elements.clear()
async with cl: async with cl:
next_stage_url: Optional[str] = url next_stage_url: Optional[str] = url
current_parent = parent current_parent = parent
while next_stage_url: while next_stage_url:
soup = await self._get_page(next_stage_url) soup = await self._get_page(next_stage_url)
log.explain_topic(f"Parsing HTML page for {fmt_path(path)}") log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}") log.explain(f"URL: {next_stage_url}")
next_stage_hook(current_parent, soup)
page = IliasPage(soup, next_stage_url, current_parent) page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element(): if next_element := page.get_next_stage_element():
current_parent = next_element current_parent = next_element
@ -261,56 +253,94 @@ instance's greatest bottleneck.
else: else:
next_stage_url = None next_stage_url = None
elements.extend(page.get_child_elements()) for element in sorted(page.get_child_elements(), key=lambda e: e.id()):
await self._handle_ilias_element(cl.path, element)
# Fill up our task list with the found elements if description_string := page.get_description():
await gather_elements() await self._download_description(cl.path, description_string)
tasks = [self._handle_ilias_element(cl.path, element) for element in elements]
# And execute them
await self.gather(tasks)
# These decorators only apply *to this method* and *NOT* to the returned
# awaitables!
# This method does not await the handlers but returns them instead.
# This ensures one level is handled at a time and name deduplication
# works correctly.
@anoncritical @anoncritical
# Shouldn't happen but we also really don't want to let I/O errors bubble up to anoncritical. async def _handle_ilias_element(
# If that happens we will be terminated as anoncritical doesn't tream them as non-critical. self,
@_wrap_io_in_warning("handling ilias element") parent_path: PurePath,
async def _handle_ilias_element(self, parent_path: PurePath, element: IliasPageElement) -> None: element: IliasPageElement,
) -> None:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. "
+ f"First path: {fmt_path(self._visited_urls[element.url])}. "
+ f"Second path: {fmt_path(parent_path)}."
)
self._visited_urls[element.url] = parent_path
element_path = PurePath(parent_path, element.name) element_path = PurePath(parent_path, element.name)
if element.type in _VIDEO_ELEMENTS: if element.type in _VIDEO_ELEMENTS:
log.explain_topic(f"Decision: Crawl video element {fmt_path(element_path)}")
if not self._videos: if not self._videos:
log.explain("Video crawling is disabled") log.status(
log.explain("Answer: no") "[bold bright_black]",
return "Ignored",
else: fmt_path(element_path),
log.explain("Video crawling is enabled") "[bright_black](enable with option 'videos')"
log.explain("Answer: yes") )
return None
if element.type == IliasElementType.FILE: if element.type == IliasElementType.FILE:
await self._download_file(element, element_path) await self._handle_file(element, element_path)
elif element.type == IliasElementType.FORUM: elif element.type == IliasElementType.FORUM:
log.explain_topic(f"Decision: Crawl {fmt_path(element_path)}") if not self._forums:
log.explain("Forums are not supported") log.status(
log.explain("Answer: No") "[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](enable with option 'forums')"
)
return None
await self._handle_forum(element, element_path)
elif element.type == IliasElementType.TEST: elif element.type == IliasElementType.TEST:
log.explain_topic(f"Decision: Crawl {fmt_path(element_path)}") log.status(
log.explain("Tests contain no relevant files") "[bold bright_black]",
log.explain("Answer: No") "Ignored",
fmt_path(element_path),
"[bright_black](tests contain no relevant data)"
)
return None
elif element.type == IliasElementType.SURVEY:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](surveys contain no relevant data)"
)
return None
elif element.type == IliasElementType.LINK: elif element.type == IliasElementType.LINK:
await self._download_link(element, element_path) await self._handle_link(element, element_path)
elif element.type == IliasElementType.BOOKING:
await self._handle_booking(element, element_path)
elif element.type == IliasElementType.VIDEO: elif element.type == IliasElementType.VIDEO:
await self._download_file(element, element_path) await self._handle_file(element, element_path)
elif element.type == IliasElementType.VIDEO_PLAYER: elif element.type == IliasElementType.VIDEO_PLAYER:
await self._download_video(element, element_path) await self._handle_video(element, element_path)
elif element.type in _DIRECTORY_PAGES: elif element.type in _DIRECTORY_PAGES:
await self._handle_ilias_page(element.url, element, element_path) maybe_cl = await self.crawl(element_path)
if not maybe_cl:
return None
await self._crawl_ilias_page(element.url, element, maybe_cl)
else: else:
# This will retry it a few times, failing everytime. It doesn't make any network # This will retry it a few times, failing everytime. It doesn't make any network
# requests, so that's fine. # requests, so that's fine.
raise CrawlWarning(f"Unknown element type: {element.type!r}") raise CrawlWarning(f"Unknown element type: {element.type!r}")
async def _download_link(self, element: IliasPageElement, element_path: PurePath) -> None: async def _handle_link(
self,
element: IliasPageElement,
element_path: PurePath,
) -> None:
log.explain_topic(f"Decision: Crawl Link {fmt_path(element_path)}") log.explain_topic(f"Decision: Crawl Link {fmt_path(element_path)}")
log.explain(f"Links type is {self._links}") log.explain(f"Links type is {self._links}")
@ -318,32 +348,87 @@ instance's greatest bottleneck.
link_extension = self._links.extension() link_extension = self._links.extension()
if not link_template_maybe or not link_extension: if not link_template_maybe or not link_extension:
log.explain("Answer: No") log.explain("Answer: No")
return return None
else: else:
log.explain("Answer: Yes") log.explain("Answer: Yes")
link_template = link_template_maybe
element_path = element_path.with_name(element_path.name + link_extension) element_path = element_path.with_name(element_path.name + link_extension)
maybe_dl = await self.download(element_path, mtime=element.mtime) maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl: if not maybe_dl:
return return None
dl = maybe_dl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
await self._download_link(element, link_template_maybe, maybe_dl)
@anoncritical
@_iorepeat(3, "resolving link") @_iorepeat(3, "resolving link")
async def impl() -> None: async def _download_link(self, element: IliasPageElement, link_template: str, dl: DownloadToken) -> None:
async with dl as (bar, sink): async with dl as (bar, sink):
export_url = element.url.replace("cmd=calldirectlink", "cmd=exportHTML") export_url = element.url.replace("cmd=calldirectlink", "cmd=exportHTML")
real_url = await self._resolve_link_target(export_url) real_url = await self._resolve_link_target(export_url)
self._write_link_content(link_template, real_url, element.name, element.description, sink)
def _write_link_content(
self,
link_template: str,
url: str,
name: str,
description: Optional[str],
sink: FileSink,
) -> None:
content = link_template content = link_template
content = content.replace("{{link}}", real_url) content = content.replace("{{link}}", url)
content = content.replace("{{name}}", element.name) content = content.replace("{{name}}", name)
content = content.replace("{{description}}", str(element.description)) content = content.replace("{{description}}", str(description))
content = content.replace("{{redirect_delay}}", str(self._link_file_redirect_delay)) content = content.replace("{{redirect_delay}}", str(self._link_file_redirect_delay))
sink.file.write(content.encode("utf-8")) sink.file.write(content.encode("utf-8"))
sink.done() sink.done()
await impl() async def _handle_booking(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
log.explain_topic(f"Decision: Crawl Booking Link {fmt_path(element_path)}")
log.explain(f"Links type is {self._links}")
link_template_maybe = self._links.template()
link_extension = self._links.extension()
if not link_template_maybe or not link_extension:
log.explain("Answer: No")
return None
else:
log.explain("Answer: Yes")
element_path = element_path.with_name(element_path.name + link_extension)
maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl:
return None
return self._download_booking(element, link_template_maybe, maybe_dl)
@anoncritical
@_iorepeat(1, "downloading description")
async def _download_description(self, parent_path: PurePath, description: BeautifulSoup) -> None:
path = parent_path / "Description.html"
dl = await self.download(path, redownload=Redownload.ALWAYS)
if not dl:
return
async with dl as (bar, sink):
description = clean(insert_base_markup(description))
sink.file.write(description.prettify().encode("utf-8"))
sink.done()
@anoncritical
@_iorepeat(3, "resolving booking")
async def _download_booking(
self,
element: IliasPageElement,
link_template: str,
dl: DownloadToken,
) -> None:
async with dl as (bar, sink):
self._write_link_content(link_template, element.url, element.name, element.description, sink)
async def _resolve_link_target(self, export_url: str) -> str: async def _resolve_link_target(self, export_url: str) -> str:
async with self.session.get(export_url, allow_redirects=False) as resp: async with self.session.get(export_url, allow_redirects=False) as resp:
@ -351,7 +436,7 @@ instance's greatest bottleneck.
if hdrs.LOCATION not in resp.headers: if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()).select_one("a").get("href").strip() return soupify(await resp.read()).select_one("a").get("href").strip()
self._authenticate() await self._authenticate()
async with self.session.get(export_url, allow_redirects=False) as resp: async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated # No redirect means we were authenticated
@ -360,40 +445,131 @@ instance's greatest bottleneck.
raise CrawlError("resolve_link_target failed even after authenticating") raise CrawlError("resolve_link_target failed even after authenticating")
async def _download_video(self, element: IliasPageElement, element_path: PurePath) -> None: async def _handle_video(
# Videos will NOT be redownloaded - their content doesn't really change and they are chunky self,
maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.NEVER) element: IliasPageElement,
if not maybe_dl: element_path: PurePath,
return ) -> None:
dl = maybe_dl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608 # Copy old mapping as it is likely still relevant
if self.prev_report:
self.report.add_custom_value(
str(element_path),
self.prev_report.get_custom_value(str(element_path))
)
# A video might contain other videos, so let's "crawl" the video first
# to ensure rate limits apply. This must be a download as *this token*
# is re-used if the video consists of a single stream. In that case the
# file name is used and *not* the stream name the ilias html parser reported
# to ensure backwards compatibility.
maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.ALWAYS)
# If we do not want to crawl it (user filter) or we have every file
# from the cached mapping already, we can ignore this and bail
if not maybe_dl or self._all_videos_locally_present(element_path):
# Mark all existing cideos as known so they do not get deleted
# during dleanup. We "downloaded" them, just without actually making
# a network request as we assumed they did not change.
for video in self._previous_contained_videos(element_path):
await self.download(video)
return None
await self._download_video(element_path, element, maybe_dl)
def _previous_contained_videos(self, video_path: PurePath) -> List[PurePath]:
if not self.prev_report:
return []
custom_value = self.prev_report.get_custom_value(str(video_path))
if not custom_value:
return []
names = cast(List[str], custom_value)
folder = video_path.parent
return [PurePath(folder, name) for name in names]
def _all_videos_locally_present(self, video_path: PurePath) -> bool:
if contained_videos := self._previous_contained_videos(video_path):
log.explain_topic(f"Checking local cache for video {video_path.name}")
all_found_locally = True
for video in contained_videos:
transformed_path = self._to_local_video_path(video)
if transformed_path:
exists_locally = self._output_dir.resolve(transformed_path).exists()
all_found_locally = all_found_locally and exists_locally
if all_found_locally:
log.explain("Found all videos locally, skipping enumeration request")
return True
log.explain("Missing at least one video, continuing with requests!")
return False
def _to_local_video_path(self, path: PurePath) -> Optional[PurePath]:
if transformed := self._transformer.transform(path):
return self._deduplicator.fixup_path(transformed)
return None
@anoncritical
@_iorepeat(3, "downloading video") @_iorepeat(3, "downloading video")
async def impl() -> None: async def _download_video(
assert dl # The function is only reached when dl is not None self,
original_path: PurePath,
element: IliasPageElement,
dl: DownloadToken
) -> None:
stream_elements: List[IliasPageElement] = []
async with dl as (bar, sink): async with dl as (bar, sink):
page = IliasPage(await self._get_page(element.url), element.url, element) page = IliasPage(await self._get_page(element.url), element.url, element)
real_element = page.get_child_elements()[0] stream_elements = page.get_child_elements()
log.explain(f"Streaming video from real url {real_element.url}") if len(stream_elements) > 1:
log.explain(f"Found multiple video streams for {element.name}")
else:
log.explain(f"Using single video mode for {element.name}")
stream_element = stream_elements[0]
await self._stream_from_url(real_element.url, sink, bar, is_video=True) transformed_path = self._to_local_video_path(original_path)
if not transformed_path:
raise CrawlError(f"Download returned a path but transform did not for {original_path}")
await impl() # We do not have a local cache yet
if self._output_dir.resolve(transformed_path).exists():
log.explain(f"Video for {element.name} existed locally")
else:
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
self.report.add_custom_value(str(original_path), [original_path.name])
return
async def _download_file(self, element: IliasPageElement, element_path: PurePath) -> None: contained_video_paths: List[str] = []
for stream_element in stream_elements:
video_path = original_path.parent / stream_element.name
contained_video_paths.append(str(video_path))
maybe_dl = await self.download(video_path, mtime=element.mtime, redownload=Redownload.NEVER)
if not maybe_dl:
continue
async with maybe_dl as (bar, sink):
log.explain(f"Streaming video from real url {stream_element.url}")
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
self.report.add_custom_value(str(original_path), contained_video_paths)
async def _handle_file(
self,
element: IliasPageElement,
element_path: PurePath,
) -> None:
maybe_dl = await self.download(element_path, mtime=element.mtime) maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl: if not maybe_dl:
return return None
dl = maybe_dl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608 await self._download_file(element, maybe_dl)
@anoncritical
@_iorepeat(3, "downloading file") @_iorepeat(3, "downloading file")
async def impl() -> None: async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None:
assert dl # The function is only reached when dl is not None assert dl # The function is only reached when dl is not None
async with dl as (bar, sink): async with dl as (bar, sink):
await self._stream_from_url(element.url, sink, bar, is_video=False) await self._stream_from_url(element.url, sink, bar, is_video=False)
await impl()
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None: async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None:
async def try_stream() -> bool: async def try_stream() -> bool:
async with self.session.get(url, allow_redirects=is_video) as resp: async with self.session.get(url, allow_redirects=is_video) as resp:
@ -424,6 +600,72 @@ instance's greatest bottleneck.
if not await try_stream(): if not await try_stream():
raise CrawlError("File streaming failed after authenticate()") raise CrawlError("File streaming failed after authenticate()")
async def _handle_forum(
self,
element: IliasPageElement,
element_path: PurePath,
) -> None:
maybe_cl = await self.crawl(element_path)
if not maybe_cl:
return None
await self._crawl_forum(element, maybe_cl)
@_iorepeat(3, "crawling forum")
@anoncritical
async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None:
elements: List[IliasForumThread] = []
async with cl:
next_stage_url = element.url
while next_stage_url:
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
soup = await self._get_page(next_stage_url)
page = IliasPage(soup, next_stage_url, None)
if next := page.get_next_stage_element():
next_stage_url = next.url
else:
break
download_data = page.get_download_forum_data()
if not download_data:
raise CrawlWarning("Failed to extract forum data")
if download_data.empty:
log.explain("Forum had no threads")
elements = []
return
html = await self._post_authenticated(download_data.url, download_data.form_data)
elements = parse_ilias_forum_export(soupify(html))
elements.sort(key=lambda elem: elem.title)
tasks: List[Awaitable[None]] = []
for elem in elements:
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, elem)))
# And execute them
await self.gather(tasks)
@anoncritical
@_iorepeat(3, "saving forum thread")
async def _download_forum_thread(
self,
parent_path: PurePath,
element: IliasForumThread,
) -> None:
path = parent_path / (_sanitize_path_name(element.title) + ".html")
maybe_dl = await self.download(path, mtime=element.mtime)
if not maybe_dl:
return
async with maybe_dl as (bar, sink):
content = element.title_tag.prettify()
content += element.content_tag.prettify()
sink.file.write(content.encode("utf-8"))
sink.done()
async def _get_page(self, url: str) -> BeautifulSoup: async def _get_page(self, url: str) -> BeautifulSoup:
auth_id = await self._current_auth_id() auth_id = await self._current_auth_id()
async with self.session.get(url) as request: async with self.session.get(url) as request:
@ -441,18 +683,49 @@ instance's greatest bottleneck.
return soup return soup
raise CrawlError("get_page failed even after authenticating") raise CrawlError("get_page failed even after authenticating")
async def _post_authenticated(
self,
url: str,
data: dict[str, Union[str, List[str]]]
) -> BeautifulSoup:
auth_id = await self._current_auth_id()
form_data = aiohttp.FormData()
for key, val in data.items():
form_data.add_field(key, val)
async with self.session.post(url, data=form_data(), allow_redirects=False) as request:
if request.status == 200:
return await request.read()
# We weren't authenticated, so try to do that
await self.authenticate(auth_id)
# Retry once after authenticating. If this fails, we will die.
async with self.session.post(url, data=data, allow_redirects=False) as request:
if request.status == 200:
return await request.read()
raise CrawlError("post_authenticated failed even after authenticating")
# We repeat this as the login method in shibboleth doesn't handle I/O errors. # We repeat this as the login method in shibboleth doesn't handle I/O errors.
# Shibboleth is quite reliable as well, the repeat is likely not critical here. # Shibboleth is quite reliable as well, the repeat is likely not critical here.
@_iorepeat(3, "Login") @ _iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None: async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session) await self._shibboleth_login.login(self.session)
@staticmethod @ staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool: def _is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages # Normal ILIAS pages
userlog = soup.find("li", {"id": "userlog"}) mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if userlog is not None: if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True return True
# Video listing embeds do not have complete ILIAS html. Try to match them by # Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table # their video listing table
video_table = soup.find( video_table = soup.find(
@ -490,14 +763,17 @@ class KitShibbolethLogin:
# Equivalent: Click on "Mit KIT-Account anmelden" button in # Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php # https://ilias.studium.kit.edu/login.php
url = "https://ilias.studium.kit.edu/Shibboleth.sso/Login" url = f"{_ILIAS_URL}/shib_login.php"
data = { data = {
"sendLogin": "1", "sendLogin": "1",
"idp_selection": "https://idp.scc.kit.edu/idp/shibboleth", "idp_selection": "https://idp.scc.kit.edu/idp/shibboleth",
"target": "/shib_login.php", "il_target": "",
"home_organization_selection": "Mit KIT-Account anmelden", "home_organization_selection": "Weiter",
} }
soup: BeautifulSoup = await _post(sess, url, data) soup: Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful] = await _shib_post(sess, url, data)
if isinstance(soup, KitShibbolethBackgroundLoginSuccessful):
return
# Attempt to login using credentials, if necessary # Attempt to login using credentials, if necessary
while not self._login_successful(soup): while not self._login_successful(soup):
@ -520,6 +796,12 @@ class KitShibbolethLogin:
} }
soup = await _post(sess, url, data) soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup): if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup) soup = await self._authenticate_tfa(sess, soup)
@ -530,7 +812,7 @@ class KitShibbolethLogin:
# (or clicking "Continue" if you have JS disabled) # (or clicking "Continue" if you have JS disabled)
relay_state = soup.find("input", {"name": "RelayState"}) relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"}) saml_response = soup.find("input", {"name": "SAMLResponse"})
url = "https://ilias.studium.kit.edu/Shibboleth.sso/SAML2/POST" url = f"{_ILIAS_URL}/Shibboleth.sso/SAML2/POST"
data = { # using the info obtained in the while loop above data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"], "RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"], "SAMLResponse": saml_response["value"],
@ -577,3 +859,51 @@ class KitShibbolethLogin:
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup: async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response: async with session.post(url, data=data) as response:
return soupify(await response.read()) return soupify(await response.read())
async def _shib_post(
session: aiohttp.ClientSession,
url: str,
data: Any
) -> Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful]:
"""
aiohttp unescapes '/' and ':' in URL query parameters which is not RFC compliant and rejected
by Shibboleth. Thanks a lot. So now we unroll the requests manually, parse location headers and
build encoded URL objects ourselves... Who thought mangling location header was a good idea??
"""
log.explain_topic("Shib login POST")
async with session.post(url, data=data, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Got location {location!r}")
if not location:
raise CrawlWarning(f"Login failed (1), no location header present at {url}")
correct_url = yarl.URL(location, encoded=True)
log.explain(f"Corrected location to {correct_url!r}")
if str(correct_url).startswith(_ILIAS_URL):
log.explain("ILIAS recognized our shib token and logged us in in the background, returning")
return KitShibbolethBackgroundLoginSuccessful()
async with session.get(correct_url, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Redirected to {location!r} with status {response.status}")
# If shib still still has a valid session, it will directly respond to the request
if location is None:
log.explain("Shib recognized us, returning its response directly")
return soupify(await response.read())
as_yarl = yarl.URL(response.url)
# Probably not needed anymore, but might catch a few weird situations with a nicer message
if not location or not as_yarl.host:
raise CrawlWarning(f"Login failed (2), no location header present at {correct_url}")
correct_url = yarl.URL.build(
scheme=as_yarl.scheme,
host=as_yarl.host,
path=location,
encoded=True
)
log.explain(f"Corrected location to {correct_url!r}")
async with session.get(correct_url, allow_redirects=False) as response:
return soupify(await response.read())

View File

@ -0,0 +1,165 @@
import os
import re
from dataclasses import dataclass
from pathlib import PurePath
from typing import List, Optional, Pattern, Set, Tuple, Union
from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag
from ..config import Config
from ..logging import ProgressBar, log
from ..output_dir import FileSink
from ..utils import soupify
from .crawler import CrawlError
from .http_crawler import HttpCrawler, HttpCrawlerSection
class KitIpdCrawlerSection(HttpCrawlerSection):
def target(self) -> str:
target = self.s.get("target")
if not target:
self.missing_value("target")
if not target.startswith("https://"):
self.invalid_value("target", target, "Should be a URL")
return target
def link_regex(self) -> Pattern[str]:
regex = self.s.get("link_regex", r"^.*?[^/]+\.(pdf|zip|c|cpp|java)$")
return re.compile(regex)
@dataclass(unsafe_hash=True)
class KitIpdFile:
name: str
url: str
@dataclass
class KitIpdFolder:
name: str
files: List[KitIpdFile]
def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}")
for file in self.files:
log.explain(f"File {file.name!r} (href={file.url!r})")
def __hash__(self) -> int:
return self.name.__hash__()
class KitIpdCrawler(HttpCrawler):
def __init__(
self,
name: str,
section: KitIpdCrawlerSection,
config: Config,
):
super().__init__(name, section, config)
self._url = section.target()
self._file_regex = section.link_regex()
async def _run(self) -> None:
cl = await self.crawl(PurePath("."))
if not cl:
return
async with cl:
for item in await self._fetch_items():
if isinstance(item, KitIpdFolder):
await self._crawl_folder(item)
else:
# Orphan files are placed in the root folder
await self._download_file(PurePath("."), item)
async def _crawl_folder(self, folder: KitIpdFolder) -> None:
path = PurePath(folder.name)
if not await self.crawl(path):
return
for file in folder.files:
await self._download_file(path, file)
async def _download_file(self, parent: PurePath, file: KitIpdFile) -> None:
element_path = parent / file.name
dl = await self.download(element_path)
if not dl:
return
async with dl as (bar, sink):
await self._stream_from_url(file.url, sink, bar)
async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]:
page, url = await self._get_page()
elements: List[Tag] = self._find_file_links(page)
items: Set[Union[KitIpdFile, KitIpdFolder]] = set()
for element in elements:
folder_label = self._find_folder_label(element)
if folder_label:
folder = self._extract_folder(folder_label, url)
if folder not in items:
items.add(folder)
folder.explain()
else:
file = self._extract_file(element, url)
items.add(file)
log.explain_topic(f"Orphan file {file.name!r} (href={file.url!r})")
log.explain("Attributing it to root folder")
return items
def _extract_folder(self, folder_tag: Tag, url: str) -> KitIpdFolder:
files: List[KitIpdFile] = []
name = folder_tag.getText().strip()
container: Tag = folder_tag.findNextSibling(name="table")
for link in self._find_file_links(container):
files.append(self._extract_file(link, url))
return KitIpdFolder(name, files)
@staticmethod
def _find_folder_label(file_link: Tag) -> Optional[Tag]:
enclosing_table: Tag = file_link.findParent(name="table")
if enclosing_table is None:
return None
return enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$"))
def _extract_file(self, link: Tag, url: str) -> KitIpdFile:
url = self._abs_url_from_link(url, link)
name = os.path.basename(url)
return KitIpdFile(name, url)
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> List[Tag]:
return tag.findAll(name="a", attrs={"href": self._file_regex})
def _abs_url_from_link(self, url: str, link_tag: Tag) -> str:
return urljoin(url, link_tag.get("href"))
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp:
if resp.status == 403:
raise CrawlError("Received a 403. Are you within the KIT network/VPN?")
if resp.content_length:
bar.set_total(resp.content_length)
async for data in resp.content.iter_chunked(1024):
sink.file.write(data)
bar.advance(len(data))
sink.done()
async def _get_page(self) -> Tuple[BeautifulSoup, str]:
response = self.session.get(self._url)
# The web page for Algorithmen für Routenplanung contains some
# weird comments that beautifulsoup doesn't parse correctly. This
# hack enables those pages to be crawled, and should hopefully not
# cause issues on other pages.
content = re.sub(r"<!--.*?-->", "", response.text)
return soupify(content.encode("utf-8")), str(request.url)

View File

@ -71,8 +71,6 @@ class LocalCrawler(Crawler):
if not cl: if not cl:
return return
tasks = []
async with cl: async with cl:
await asyncio.sleep(random.uniform( await asyncio.sleep(random.uniform(
0.5 * self._crawl_delay, 0.5 * self._crawl_delay,
@ -81,9 +79,7 @@ class LocalCrawler(Crawler):
for child in path.iterdir(): for child in path.iterdir():
pure_child = cl.path / child.name pure_child = cl.path / child.name
tasks.append(self._crawl_path(child, pure_child)) await self._crawl_path(child, pure_child)
await self.gather(tasks)
async def _crawl_file(self, path: Path, pure: PurePath) -> None: async def _crawl_file(self, path: Path, pure: PurePath) -> None:
stat = path.stat() stat = path.stat()

View File

@ -56,6 +56,12 @@ class Deduplicator:
log.explain(f"Changed path to {fmt_path(new_path)} for windows compatibility") log.explain(f"Changed path to {fmt_path(new_path)} for windows compatibility")
return new_path return new_path
def fixup_path(self, path: PurePath) -> PurePath:
"""Fixes up the path for windows, if enabled. Returns the path unchanged otherwise."""
if self._windows_paths:
return self._fixup_for_windows(path)
return path
def mark(self, path: PurePath) -> PurePath: def mark(self, path: PurePath) -> PurePath:
if self._windows_paths: if self._windows_paths:
path = self._fixup_for_windows(path) path = self._fixup_for_windows(path)

View File

@ -1,97 +0,0 @@
import asyncio
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncIterator, Optional
@dataclass
class Slot:
active: bool = False
last_left: Optional[float] = None
class Limiter:
def __init__(
self,
task_limit: int,
download_limit: int,
task_delay: float
):
if task_limit <= 0:
raise ValueError("task limit must be at least 1")
if download_limit <= 0:
raise ValueError("download limit must be at least 1")
if download_limit > task_limit:
raise ValueError("download limit can't be greater than task limit")
if task_delay < 0:
raise ValueError("Task delay must not be negative")
self._slots = [Slot() for _ in range(task_limit)]
self._downloads = download_limit
self._delay = task_delay
self._condition = asyncio.Condition()
def _acquire_slot(self) -> Optional[Slot]:
for slot in self._slots:
if not slot.active:
slot.active = True
return slot
return None
async def _wait_for_slot_delay(self, slot: Slot) -> None:
if slot.last_left is not None:
delay = slot.last_left + self._delay - time.time()
if delay > 0:
await asyncio.sleep(delay)
def _release_slot(self, slot: Slot) -> None:
slot.last_left = time.time()
slot.active = False
@asynccontextmanager
async def limit_crawl(self) -> AsyncIterator[None]:
slot: Slot
async with self._condition:
while True:
if found_slot := self._acquire_slot():
slot = found_slot
break
await self._condition.wait()
await self._wait_for_slot_delay(slot)
try:
yield
finally:
async with self._condition:
self._release_slot(slot)
self._condition.notify_all()
@asynccontextmanager
async def limit_download(self) -> AsyncIterator[None]:
slot: Slot
async with self._condition:
while True:
if self._downloads <= 0:
await self._condition.wait()
continue
if found_slot := self._acquire_slot():
slot = found_slot
self._downloads -= 1
break
await self._condition.wait()
await self._wait_for_slot_delay(slot)
try:
yield
finally:
async with self._condition:
self._release_slot(slot)
self._downloads += 1
self._condition.notify_all()

View File

@ -5,7 +5,7 @@ from contextlib import asynccontextmanager, contextmanager
# TODO In Python 3.9 and above, ContextManager is deprecated # TODO In Python 3.9 and above, ContextManager is deprecated
from typing import AsyncIterator, ContextManager, Iterator, List, Optional from typing import AsyncIterator, ContextManager, Iterator, List, Optional
from rich.console import Console, RenderGroup from rich.console import Console, Group
from rich.live import Live from rich.live import Live
from rich.markup import escape from rich.markup import escape
from rich.panel import Panel from rich.panel import Panel
@ -68,7 +68,7 @@ class Log:
if self._download_progress.task_ids: if self._download_progress.task_ids:
elements.append(self._download_progress) elements.append(self._download_progress)
group = RenderGroup(*elements) # type: ignore group = Group(*elements)
self._live.update(group) self._live.update(group)
@contextmanager @contextmanager
@ -197,7 +197,7 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
if self.output_explain: if self.output_explain:
self.print(f" {escape(text)}") self.print(f" {escape(text)}")
def status(self, style: str, action: str, text: str) -> None: def status(self, style: str, action: str, text: str, suffix: str = "") -> None:
""" """
Print a status update while crawling. Allows markup in the "style" Print a status update while crawling. Allows markup in the "style"
argument which will be applied to the "action" string. argument which will be applied to the "action" string.
@ -205,7 +205,7 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
if self.output_status: if self.output_status:
action = escape(f"{action:<{self.STATUS_WIDTH}}") action = escape(f"{action:<{self.STATUS_WIDTH}}")
self.print(f"{style}{action}[/] {escape(text)}") self.print(f"{style}{action}[/] {escape(text)} {suffix}")
def report(self, text: str) -> None: def report(self, text: str) -> None:
""" """

View File

@ -231,7 +231,10 @@ class OutputDirectory:
stat = local_path.stat() stat = local_path.stat()
remote_newer = None remote_newer = None
if mtime := heuristics.mtime:
# Python on Windows crashes when faced with timestamps around the unix epoch
if heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970):
mtime = heuristics.mtime
remote_newer = mtime.timestamp() > stat.st_mtime remote_newer = mtime.timestamp() > stat.st_mtime
if remote_newer: if remote_newer:
log.explain("Remote file seems to be newer") log.explain("Remote file seems to be newer")
@ -500,7 +503,7 @@ class OutputDirectory:
try: try:
self._prev_report = Report.load(self._report_path) self._prev_report = Report.load(self._report_path)
log.explain("Loaded report successfully") log.explain("Loaded report successfully")
except (OSError, json.JSONDecodeError, ReportLoadError) as e: except (OSError, UnicodeDecodeError, json.JSONDecodeError, ReportLoadError) as e:
log.explain("Failed to load report") log.explain("Failed to load report")
log.explain(str(e)) log.explain(str(e))

View File

@ -15,13 +15,13 @@ class PferdLoadError(Exception):
class Pferd: class Pferd:
def __init__(self, config: Config, cli_crawlers: Optional[List[str]]): def __init__(self, config: Config, cli_crawlers: Optional[List[str]], cli_skips: Optional[List[str]]):
""" """
May throw PferdLoadError. May throw PferdLoadError.
""" """
self._config = config self._config = config
self._crawlers_to_run = self._find_crawlers_to_run(config, cli_crawlers) self._crawlers_to_run = self._find_crawlers_to_run(config, cli_crawlers, cli_skips)
self._authenticators: Dict[str, Authenticator] = {} self._authenticators: Dict[str, Authenticator] = {}
self._crawlers: Dict[str, Crawler] = {} self._crawlers: Dict[str, Crawler] = {}
@ -65,16 +65,30 @@ class Pferd:
return crawlers_to_run return crawlers_to_run
def _find_crawlers_to_run(self, config: Config, cli_crawlers: Optional[List[str]]) -> List[str]: def _find_crawlers_to_run(
self,
config: Config,
cli_crawlers: Optional[List[str]],
cli_skips: Optional[List[str]],
) -> List[str]:
log.explain_topic("Deciding which crawlers to run") log.explain_topic("Deciding which crawlers to run")
crawlers: List[str]
if cli_crawlers is None: if cli_crawlers is None:
log.explain("No crawlers specified on CLI") log.explain("No crawlers specified on CLI")
log.explain("Running crawlers specified in config") log.explain("Running crawlers specified in config")
return self._find_config_crawlers(config) crawlers = self._find_config_crawlers(config)
else: else:
log.explain("Crawlers specified on CLI") log.explain("Crawlers specified on CLI")
return self._find_cli_crawlers(config, cli_crawlers) crawlers = self._find_cli_crawlers(config, cli_crawlers)
skips = {f"crawl:{name}" for name in cli_skips} if cli_skips else set()
for crawler in crawlers:
if crawler in skips:
log.explain(f"Skipping crawler {crawler!r}")
crawlers = [crawler for crawler in crawlers if crawler not in skips]
return crawlers
def _load_authenticators(self) -> None: def _load_authenticators(self) -> None:
for name, section in self._config.auth_sections(): for name, section in self._config.auth_sections():
@ -168,5 +182,13 @@ class Pferd:
something_changed = True something_changed = True
log.report(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}") log.report(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
for warning in crawler.report.encountered_warnings:
something_changed = True
log.report(f" [bold bright_red]Warning[/] {warning}")
for error in crawler.report.encountered_errors:
something_changed = True
log.report(f" [bold bright_red]Error[/] {error}")
if not something_changed: if not something_changed:
log.report(" Nothing changed") log.report(" Nothing changed")

View File

@ -1,6 +1,6 @@
import json import json
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Any, Dict, List, Set from typing import Any, Dict, List, Optional, Set
class ReportLoadError(Exception): class ReportLoadError(Exception):
@ -68,6 +68,13 @@ class Report:
# Files that should have been deleted by the cleanup but weren't # Files that should have been deleted by the cleanup but weren't
self.not_deleted_files: Set[PurePath] = set() self.not_deleted_files: Set[PurePath] = set()
# Custom crawler-specific data
self.custom: Dict[str, Any] = dict()
# Encountered errors and warnings
self.encountered_warnings: List[str] = []
self.encountered_errors: List[str] = []
@staticmethod @staticmethod
def _get_list_of_strs(data: Dict[str, Any], key: str) -> List[str]: def _get_list_of_strs(data: Dict[str, Any], key: str) -> List[str]:
result: Any = data.get(key, []) result: Any = data.get(key, [])
@ -81,13 +88,22 @@ class Report:
return result return result
@staticmethod
def _get_str_dictionary(data: Dict[str, Any], key: str) -> Dict[str, Any]:
result: Dict[str, Any] = data.get(key, {})
if not isinstance(result, dict):
raise ReportLoadError(f"Incorrect format: {key!r} is not a dictionary")
return result
@classmethod @classmethod
def load(cls, path: Path) -> "Report": def load(cls, path: Path) -> "Report":
""" """
May raise OSError, JsonDecodeError, ReportLoadError. May raise OSError, UnicodeDecodeError, JsonDecodeError, ReportLoadError.
""" """
with open(path) as f: with open(path, encoding="utf-8") as f:
data = json.load(f) data = json.load(f)
if not isinstance(data, dict): if not isinstance(data, dict):
@ -108,6 +124,9 @@ class Report:
self.delete_file(PurePath(elem)) self.delete_file(PurePath(elem))
for elem in self._get_list_of_strs(data, "not_deleted"): for elem in self._get_list_of_strs(data, "not_deleted"):
self.not_delete_file(PurePath(elem)) self.not_delete_file(PurePath(elem))
self.custom = self._get_str_dictionary(data, "custom")
self.encountered_errors = self._get_list_of_strs(data, "encountered_errors")
self.encountered_warnings = self._get_list_of_strs(data, "encountered_warnings")
return self return self
@ -124,9 +143,12 @@ class Report:
"changed": [str(path) for path in sorted(self.changed_files)], "changed": [str(path) for path in sorted(self.changed_files)],
"deleted": [str(path) for path in sorted(self.deleted_files)], "deleted": [str(path) for path in sorted(self.deleted_files)],
"not_deleted": [str(path) for path in sorted(self.not_deleted_files)], "not_deleted": [str(path) for path in sorted(self.not_deleted_files)],
"custom": self.custom,
"encountered_warnings": self.encountered_warnings,
"encountered_errors": self.encountered_errors,
} }
with open(path, "w") as f: with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, sort_keys=True) json.dump(data, f, indent=2, sort_keys=True)
f.write("\n") # json.dump doesn't do this f.write("\n") # json.dump doesn't do this
@ -190,3 +212,27 @@ class Report:
""" """
self.not_deleted_files.add(path) self.not_deleted_files.add(path)
def add_custom_value(self, key: str, value: Any) -> None:
"""
Adds a custom value under the passed key, overwriting any existing
"""
self.custom[key] = value
def get_custom_value(self, key: str) -> Optional[Any]:
"""
Retrieves a custom value for the given key.
"""
return self.custom.get(key)
def add_error(self, error: str) -> None:
"""
Adds an error to this report's error list.
"""
self.encountered_errors.append(error)
def add_warning(self, warning: str) -> None:
"""
Adds a warning to this report's warning list.
"""
self.encountered_warnings.append(warning)

View File

@ -41,9 +41,11 @@ TransformResult = Optional[Union[Transformed, Ignored]]
@dataclass @dataclass
class Rule: class Rule:
left: str left: str
left_index: int
name: str name: str
head: ArrowHead head: ArrowHead
right: RightSide right: RightSide
right_index: int
def right_result(self, path: PurePath) -> Union[str, Transformed, Ignored]: def right_result(self, path: PurePath) -> Union[str, Transformed, Ignored]:
if isinstance(self.right, str): if isinstance(self.right, str):
@ -345,6 +347,7 @@ def parse_eol(line: Line) -> None:
def parse_rule(line: Line) -> Rule: def parse_rule(line: Line) -> Rule:
parse_zero_or_more_spaces(line) parse_zero_or_more_spaces(line)
left_index = line.index
left = parse_left(line) left = parse_left(line)
parse_one_or_more_spaces(line) parse_one_or_more_spaces(line)
@ -354,19 +357,19 @@ def parse_rule(line: Line) -> Rule:
line.expect("-") line.expect("-")
head = parse_arrow_head(line) head = parse_arrow_head(line)
index = line.index right_index = line.index
right: RightSide right: RightSide
try: try:
parse_zero_or_more_spaces(line) parse_zero_or_more_spaces(line)
parse_eol(line) parse_eol(line)
right = Empty() right = Empty()
except RuleParseError: except RuleParseError:
line.index = index line.index = right_index
parse_one_or_more_spaces(line) parse_one_or_more_spaces(line)
right = parse_right(line) right = parse_right(line)
parse_eol(line) parse_eol(line)
return Rule(left, name, head, right) return Rule(left, left_index, name, head, right, right_index)
def parse_transformation(line: Line) -> Transformation: def parse_transformation(line: Line) -> Transformation:
@ -377,6 +380,9 @@ def parse_transformation(line: Line) -> Transformation:
elif rule.name == "exact": elif rule.name == "exact":
return ExactTf(rule) return ExactTf(rule)
elif rule.name == "name": elif rule.name == "name":
if len(PurePath(rule.left).parts) > 1:
line.index = rule.left_index
raise RuleParseError(line, "Expected name, not multiple segments")
return RenamingPartsTf(ExactTf(rule)) return RenamingPartsTf(ExactTf(rule))
elif rule.name == "re": elif rule.name == "re":
return RenamingParentsTf(ExactReTf(rule)) return RenamingParentsTf(ExactReTf(rule))

View File

@ -92,17 +92,32 @@ def url_set_query_params(url: str, params: Dict[str, str]) -> str:
def str_path(path: PurePath) -> str: def str_path(path: PurePath) -> str:
"""
Turn a path into a string, in a platform-independent way.
This function always uses "/" as path separator, even on Windows.
"""
if not path.parts: if not path.parts:
return "." return "."
return "/".join(path.parts) return "/".join(path.parts)
def fmt_path(path: PurePath) -> str: def fmt_path(path: PurePath) -> str:
"""
Turn a path into a delimited string.
This is useful if file or directory names contain weird characters like
newlines, leading/trailing whitespace or unprintable characters. This way,
they are escaped and visible to the user.
"""
return repr(str_path(path)) return repr(str_path(path))
def fmt_real_path(path: Path) -> str: def fmt_real_path(path: Path) -> str:
return repr(str(path.absolute())) """
Like fmt_path, but resolves the path before converting it to a string.
"""
return fmt_path(path.absolute())
class ReusableAsyncContextManager(ABC, Generic[T]): class ReusableAsyncContextManager(ABC, Generic[T]):

View File

@ -1,2 +1,2 @@
NAME = "PFERD" NAME = "PFERD"
VERSION = "3.1.0" VERSION = "3.4.3"

View File

@ -17,7 +17,7 @@ Binaries for Linux, Windows and Mac can be downloaded directly from the
### With pip ### With pip
Ensure you have at least Python 3.8 installed. Run the following command to Ensure you have at least Python 3.9 installed. Run the following command to
install PFERD or upgrade it to the latest version: install PFERD or upgrade it to the latest version:
``` ```
@ -26,6 +26,17 @@ $ pip install --upgrade git+https://github.com/Garmelon/PFERD@latest
The use of [venv](https://docs.python.org/3/library/venv.html) is recommended. The use of [venv](https://docs.python.org/3/library/venv.html) is recommended.
### With package managers
Unofficial packages are available for:
- [AUR](https://aur.archlinux.org/packages/pferd)
- [brew](https://formulae.brew.sh/formula/pferd)
- [conda-forge](https://github.com/conda-forge/pferd-feedstock)
- [nixpkgs](https://github.com/NixOS/nixpkgs/blob/master/pkgs/tools/misc/pferd/default.nix)
- [PyPi](https://pypi.org/project/pferd)
See also PFERD's [repology page](https://repology.org/project/pferd/versions).
## Basic usage ## Basic usage
PFERD can be run directly from the command line with no config file. Run `pferd PFERD can be run directly from the command line with no config file. Run `pferd

View File

@ -14,4 +14,4 @@ pip install --editable .
# Installing tools and type hints # Installing tools and type hints
pip install --upgrade mypy flake8 autopep8 isort pyinstaller pip install --upgrade mypy flake8 autopep8 isort pyinstaller
pip install --upgrade types-chardet types-certifi mypy PFERD --install-types --non-interactive

View File

@ -4,13 +4,14 @@ version = attr: PFERD.version.VERSION
[options] [options]
packages = find: packages = find:
python_requires = >=3.8 python_requires = >=3.9
install_requires = install_requires =
aiohttp>=3.7.4.post0 aiohttp>=3.8.1
beautifulsoup4>=4.9.3 beautifulsoup4>=4.10.0
rich>=10.1.0 rich>=11.0.0
keyring>=23.0.1 keyring>=23.5.0
certifi>=2020.12.5 certifi>=2021.10.8
requests>=2.28.1
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =