Compare commits

..

73 Commits

Author SHA1 Message Date
4af02012bc Strip long path prefix from file links in report 2024-11-14 20:06:13 +01:00
287173b0b1 Bump version to 3.7.0 2024-11-13 20:38:27 +01:00
712217e959 Handle groups in cards 2024-11-11 12:53:08 +01:00
6dda4c55a8 Add doctype header to forum threads
This should fix mimetype detection on most systems and is more relevant
now that the report is clickable
2024-11-05 18:36:21 +01:00
596b6a7688 Add support for non-KIT shibboleth login (#98)
Co-authored-by: Mr-Pine <git@mr-pine.de>
Co-authored-by: I-Al-Istannen <I-Al-Istannen@users.noreply.github.com>
2024-11-05 18:30:34 +01:00
Tim
5983200247 Treat headings as folders in kit-ipd crawler (#99) 2024-11-04 23:53:48 +01:00
Tim
26e802d88b Add clickable links to file names in the printed report (#100)
Co-authored-by: I-Al-Istannen <i-al-istannen@users.noreply.github.com>
2024-11-04 00:32:32 +01:00
f5c4e82816 Delay ilias loop detection after transform
This allows users to filter out duplicated elements and suppress the
warning.
2024-11-02 22:46:51 +01:00
f5273f7ca0 Collapse ilias url crawling into normal page crawling 2024-11-02 22:46:51 +01:00
fa71a9f44f Add support for mob videos in page descriptions 2024-10-28 20:35:30 +01:00
81d6ff53c4 Respect row flex in descriptions 2024-10-28 19:41:03 +01:00
d7a2b6e019 Delete videos from course descriptions 2024-10-28 19:41:03 +01:00
71c65e89d1 Internalize images in course descriptions 2024-10-28 19:41:03 +01:00
c1046498e7 Fix download of links without a target URL
They are now downloaded as links to the empty url.
2024-10-28 19:41:03 +01:00
8fbd1978af Fix crawling of nested courses 2024-10-28 18:52:27 +01:00
Tim
739dd95850 Use Last-Modified and ETag headers to determine KIT-IPD file versions (#95)
Co-authored-by: I-Al-Istannen <i-al-istannen@users.noreply.github.com>
2024-10-27 19:03:47 +01:00
c54c3bcfa1 Fix crawling of favorites 2024-10-27 10:50:59 +01:00
d7f2229978 Bump version to 3.6.0 2024-10-23 20:17:47 +02:00
52fdeae752 Crawl custom item groups as folders 2024-10-21 23:43:48 +02:00
f9bb2e41cf Sanitize slashes in exercise container names 2024-10-21 22:30:16 +02:00
4f9e2ab48d Support named capture groups in regex transformers (#94) 2024-10-21 15:21:33 +02:00
19beb8f07b Document course overview downloading in config.md 2024-07-31 22:02:43 +02:00
c897d9e2f5 Support finding entries for course overview page
Related to issue #93
2024-06-26 16:54:07 +02:00
21a266e302 Update upload-artifact action to v4
https://github.com/actions/upload-artifact/blob/main/docs/MIGRATION.md#multiple-uploads-to-the-same-named-artifact
2024-05-11 16:33:14 +02:00
b29b6f93f8 run ci twice
Co-authored-by: Garmelon <joscha@plugh.de>
2024-05-11 16:09:46 +02:00
318226d7cb fix bump-version script 2024-05-11 10:27:54 +02:00
422cf05f15 Move all configuration into pyproject.toml, add x86 mac to CI 2024-05-11 10:26:19 +02:00
819c6673c7 Update changelog 2024-05-10 14:40:25 +02:00
89b44c69a7 Update docs
All config file options must be documented in CONFIG.md. The README.md
is just a starting point. To avoid duplicated info, I've moved most of
the docs to CONFIG.md.
2024-05-10 14:36:01 +02:00
4b4f72b2ca Fix command name 2024-05-10 14:34:20 +02:00
778517d8c6 Fix KIT crawler requiring base_url and client_id options 2024-05-10 14:12:45 +02:00
428b0179fc Remove IliasConfig
Also uses urljoin() in a few places that previously used string
concatenation or fstrings.

At this point, there isn't yet a need for IliasConfig, so I'd rather
keep the code base simpler and more consistent. Should we need a
structure like IliasConfig in the future (maybe because we have a few
more ilias parsers), it's easy to add back.
2024-05-10 14:09:14 +02:00
ade6309dd9 Update copyright information 2024-05-05 02:34:26 +02:00
fd6cb7b966 docs: Remove some filler words 2024-05-05 02:34:00 +02:00
5c87517ceb docs: Explain usage with generic ilias 2024-05-04 17:52:12 +02:00
b01f093474 fix: Element detection for other universities
Other universities might use other URL schemes
for different element types
2024-05-04 17:52:06 +02:00
3a05b90525 fix circular import for _io_repeat 2024-05-04 17:51:59 +02:00
7a00f73e0e feat: Add authentication to generic ilias dl 2024-05-04 17:51:38 +02:00
5d0621420e feat: Generic ilias_web command 2024-05-04 17:44:37 +02:00
df98153169 refactor: Extract generic settings from ilias command
Preparation for generic ilias_web command
2024-05-04 17:44:30 +02:00
fc1f68ccd9 refactor: Separate generic and KIT ilias functions 2024-05-04 17:44:18 +02:00
3e831c7e23 Fix normalization of meeting names in cards 2024-04-24 22:32:26 +02:00
bbcfe9c8dd Fix typo in CONFIG.md (#89) 2024-04-19 16:52:18 +02:00
eb01aa86cb Bump version to 3.5.2 2024-04-14 12:10:17 +02:00
3db186a978 Fix personal desktop crawling HTML warnings 2024-04-10 11:15:25 +02:00
4a5959fd58 Fix personal desktop crawling without favorites 2024-04-10 11:15:25 +02:00
1cbc2b717a Fix personal desktop crawling with ILIAS 8 2024-04-10 01:20:37 +02:00
da627ff929 Bump version to 3.5.1 2024-04-09 14:28:56 +02:00
c1b592ac29 Fix ILIAS 8 file downloads truncating to zero bytes 2024-04-08 17:59:41 +02:00
eb0c956d32 Add compatibility with ILIAS 8 2024-04-05 19:08:05 +02:00
ab0cb2d956 nix: bump nixpgs dependency 2024-02-27 23:39:53 +01:00
a117126389 Fix video name deduplication 2023-12-09 23:08:42 +01:00
e9f8901520 Fix typos in ilias crawler and use set literals 2023-11-30 20:57:57 +01:00
266812f90e Move is_logged_in helper to kit_ilias_html 2023-11-16 11:19:20 +01:00
533bc27439 Bump version to 3.5.0 2023-09-13 23:13:30 +02:00
0113a0ca10 Update flake.lock 2023-09-13 22:23:36 +02:00
40f8a05ad6 Add .idea to gitignore 2023-09-13 22:23:36 +02:00
50b50513c6 Ignore SCORM learning modules 2023-08-29 13:51:19 +02:00
df3514cd03 Crawl paginated past meetings 2023-08-29 12:41:21 +02:00
ad53185247 Sanitize ascii control characters on windows 2023-08-29 12:41:15 +02:00
87b67e9271 Crawl files in the info tab 2023-08-29 12:41:15 +02:00
b54b3b979c Remove size suffix for content pages 2023-08-27 11:43:05 +02:00
2184ac8040 Add support for ILIAS mediacast listings 2023-08-27 11:43:05 +02:00
b3d412360b Add Nix flake 2023-08-26 23:54:19 +02:00
dbc2553b11 Add default show-not-deleted option
If set to `no`, PFERD won't print status or report messages for not deleted files
2023-08-26 18:43:01 +02:00
68c398f1fe Add support for ILIAS learning modules 2023-08-02 13:34:54 +02:00
123a57beec Fix mypy unreachable error in file_templates 2023-07-29 18:36:33 +02:00
d204dac8ce Detect unexpected root page redirects and abort operation 2023-07-29 18:36:33 +02:00
443f7fe839 Add no-delete-prompt-overwrite crawler conflict resolution option (#75) 2023-07-29 18:36:33 +02:00
0294ceb7d5 Update github action versions 2023-03-22 00:10:54 +01:00
6f30c6583d Fix crawling of cards without descriptions 2023-03-21 23:52:33 +01:00
467fc526e8 Fix crawling of file/video cards 2023-03-21 23:52:24 +01:00
722d2eb393 Fix crawling of courses with preselected timeline tab 2023-03-21 23:36:47 +01:00
41 changed files with 2461 additions and 1295 deletions

10
.github/dependabot.yml vendored Normal file
View File

@ -0,0 +1,10 @@
version: 2
updates:
- package-ecosystem: github-actions
directory: /
schedule:
interval: monthly
groups:
gh-actions:
patterns:
- "*"

View File

@ -1,6 +1,6 @@
name: build-and-release name: build-and-release
on: push on: [push, pull_request]
defaults: defaults:
run: run:
@ -13,13 +13,12 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
os: [ubuntu-latest, windows-latest, macos-latest] os: [ubuntu-latest, windows-latest, macos-13, macos-latest]
python: ["3.9"] python: ["3.9"]
steps: steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v2 - uses: actions/setup-python@v5
- uses: actions/setup-python@v2
with: with:
python-version: ${{ matrix.python }} python-version: ${{ matrix.python }}
@ -34,7 +33,12 @@ jobs:
run: ./scripts/setup --no-pip run: ./scripts/setup --no-pip
- name: Run checks - name: Run checks
run: ./scripts/check run: |
./scripts/check
./scripts/format
- name: Assert no changes
run: git diff --exit-code
- name: Build - name: Build
run: ./scripts/build run: ./scripts/build
@ -45,9 +49,9 @@ jobs:
run: mv dist/pferd* dist/pferd-${{ matrix.os }} run: mv dist/pferd* dist/pferd-${{ matrix.os }}
- name: Upload binary - name: Upload binary
uses: actions/upload-artifact@v2 uses: actions/upload-artifact@v4
with: with:
name: Binaries name: pferd-${{ matrix.os }}
path: dist/pferd-${{ matrix.os }} path: dist/pferd-${{ matrix.os }}
release: release:
@ -57,18 +61,20 @@ jobs:
steps: steps:
- name: Download binaries - name: Download binaries
uses: actions/download-artifact@v2 uses: actions/download-artifact@v4
with: with:
name: Binaries pattern: pferd-*
merge-multiple: true
- name: Rename binaries - name: Rename binaries
run: | run: |
mv pferd-ubuntu-latest pferd-linux mv pferd-ubuntu-latest pferd-linux
mv pferd-windows-latest pferd-windows.exe mv pferd-windows-latest pferd-windows.exe
mv pferd-macos-13 pferd-mac-x86_64
mv pferd-macos-latest pferd-mac mv pferd-macos-latest pferd-mac
- name: Create release - name: Create release
uses: softprops/action-gh-release@v1 uses: softprops/action-gh-release@v2
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with: with:
@ -76,3 +82,4 @@ jobs:
pferd-linux pferd-linux
pferd-windows.exe pferd-windows.exe
pferd-mac pferd-mac
pferd-mac-x86_64

1
.gitignore vendored
View File

@ -3,6 +3,7 @@
/PFERD.egg-info/ /PFERD.egg-info/
__pycache__/ __pycache__/
/.vscode/ /.vscode/
/.idea/
# pyinstaller # pyinstaller
/pferd.spec /pferd.spec

View File

@ -22,6 +22,77 @@ ambiguous situations.
## Unreleased ## Unreleased
## 3.7.0 - 2024-11-13
### Added
- Support for MOB videos in page descriptions
- Clickable links in the report to directly open new/modified/not-deleted files
- Support for non KIT shibboleth login
### Changed
- Remove videos from description pages
- Perform ILIAS cycle detection after processing the transform to allow
ignoring duplicated elements
- Parse headings (h1-h3) as folders in kit-ipd crawler
### Fixed
- Personal desktop/dashboard/favorites crawling
- Crawling of nested courses
- Downloading of links with no target URL
- Handle row flex on description pages
- Add `<!DOCTYPE html>` heading to forum threads to fix mime type detection
- Handle groups in cards
## 3.6.0 - 2024-10-23
### Added
- Generic `ilias-web` crawler and `ilias-web` CLI command
- Support for the course overview page. Using this URL as a target might cause
duplication warnings, as subgroups are listed separately.
- Support for named capture groups in regex transforms
- Crawl custom item groups as folders
### Fixed
- Normalization of meeting names in cards
- Sanitization of slashes in exercise container names
## 3.5.2 - 2024-04-14
### Fixed
- Crawling of personal desktop with ILIAS 8
- Crawling of empty personal desktops
## 3.5.1 - 2024-04-09
### Added
- Support for ILIAS 8
### Fixed
- Video name deduplication
## 3.5.0 - 2023-09-13
### Added
- `no-delete-prompt-override` conflict resolution strategy
- Support for ILIAS learning modules
- `show_not_deleted` option to stop printing the "Not Deleted" status or report
message. This combines nicely with the `no-delete-prompt-override` strategy,
causing PFERD to mostly ignore local-only files.
- Support for mediacast video listings
- Crawling of files in info tab
### Changed
- Remove size suffix for files in content pages
### Fixed
- Crawling of courses with the timeline view as the default tab
- Crawling of file and custom opencast cards
- Crawling of button cards without descriptions
- Abort crawling when encountering an unexpected ilias root page redirect
- Sanitize ascii control characters on Windows
- Crawling of paginated past meetings
- Ignore SCORM learning modules
## 3.4.3 - 2022-11-29 ## 3.4.3 - 2022-11-29
### Added ### Added

View File

@ -4,11 +4,11 @@ A config file consists of sections. A section begins with a `[section]` header,
which is followed by a list of `key = value` pairs. Comments must be on their which is followed by a list of `key = value` pairs. Comments must be on their
own line and start with `#`. Multiline values must be indented beyond their key. own line and start with `#`. Multiline values must be indented beyond their key.
Boolean values can be `yes` or `no`. For more details and some examples on the Boolean values can be `yes` or `no`. For more details and some examples on the
format, see the [configparser documentation][1] ([interpolation][2] is format, see the [configparser documentation][cp-file]
disabled). ([interpolation][cp-interp] is disabled).
[1]: <https://docs.python.org/3/library/configparser.html#supported-ini-file-structure> "Supported INI File Structure" [cp-file]: <https://docs.python.org/3/library/configparser.html#supported-ini-file-structure> "Supported INI File Structure"
[2]: <https://docs.python.org/3/library/configparser.html#interpolation-of-values> "Interpolation of values" [cp-interp]: <https://docs.python.org/3/library/configparser.html#interpolation-of-values> "Interpolation of values"
## The `DEFAULT` section ## The `DEFAULT` section
@ -26,6 +26,9 @@ default values for the other sections.
`Added ...`) while running a crawler. (Default: `yes`) `Added ...`) while running a crawler. (Default: `yes`)
- `report`: Whether PFERD should print a report of added, changed and deleted - `report`: Whether PFERD should print a report of added, changed and deleted
local files for all crawlers before exiting. (Default: `yes`) local files for all crawlers before exiting. (Default: `yes`)
- `show_not_deleted`: Whether PFERD should print messages in status and report
when a local-only file wasn't deleted. Combines nicely with the
`no-delete-prompt-override` conflict resolution strategy.
- `share_cookies`: Whether crawlers should share cookies where applicable. For - `share_cookies`: Whether crawlers should share cookies where applicable. For
example, some crawlers share cookies if they crawl the same website using the example, some crawlers share cookies if they crawl the same website using the
same account. (Default: `yes`) same account. (Default: `yes`)
@ -75,6 +78,9 @@ common to all crawlers:
using `prompt` and always choosing "yes". using `prompt` and always choosing "yes".
- `no-delete`: Never delete local files, but overwrite local files if the - `no-delete`: Never delete local files, but overwrite local files if the
remote file is different. remote file is different.
- `no-delete-prompt-overwrite`: Never delete local files, but prompt to
overwrite local files if the remote file is different. Combines nicely
with the `show_not_deleted` option.
- `transform`: Rules for renaming and excluding certain files and directories. - `transform`: Rules for renaming and excluding certain files and directories.
For more details, see [this section](#transformation-rules). (Default: empty) For more details, see [this section](#transformation-rules). (Default: empty)
- `tasks`: The maximum number of concurrent tasks (such as crawling or - `tasks`: The maximum number of concurrent tasks (such as crawling or
@ -140,7 +146,7 @@ crawler simulate a slower, network-based crawler.
This crawler crawls a KIT-IPD page by url. The root page can be crawled from This crawler crawls a KIT-IPD page by url. The root page can be crawled from
outside the KIT network so you will be informed about any new/deleted files, outside the KIT network so you will be informed about any new/deleted files,
but downloading files requires you to be within. Adding a show delay between but downloading files requires you to be within. Adding a short delay between
requests is likely a good idea. requests is likely a good idea.
- `target`: URL to a KIT-IPD page - `target`: URL to a KIT-IPD page
@ -148,6 +154,63 @@ requests is likely a good idea.
matches, the given link is downloaded as a file. This is used to extract matches, the given link is downloaded as a file. This is used to extract
files from KIT-IPD pages. (Default: `^.*?[^/]+\.(pdf|zip|c|cpp|java)$`) files from KIT-IPD pages. (Default: `^.*?[^/]+\.(pdf|zip|c|cpp|java)$`)
### The `ilias-web` crawler
This crawler crawls a generic ILIAS instance.
Inspired by [this ILIAS downloader][ilias-dl], the following configurations should work
out of the box for the corresponding universities:
[ilias-dl]: https://github.com/V3lop5/ilias-downloader/blob/main/configs "ilias-downloader configs"
| University | `base_url` | `login_type` | `client_id` |
|---------------|-----------------------------------------|--------------|---------------|
| FH Aachen | https://www.ili.fh-aachen.de | local | elearning |
| Uni Köln | https://www.ilias.uni-koeln.de/ilias | local | uk |
| Uni Konstanz | https://ilias.uni-konstanz.de | local | ILIASKONSTANZ |
| Uni Stuttgart | https://ilias3.uni-stuttgart.de | local | Uni_Stuttgart |
| Uni Tübingen | https://ovidius.uni-tuebingen.de/ilias3 | shibboleth | |
If your university isn't listed, try navigating to your instance's login page.
Assuming no custom login service is used, the URL will look something like this:
```jinja
{{ base_url }}/login.php?client_id={{ client_id }}&cmd=force_login&lang=
```
If the values work, feel free to submit a PR and add them to the table above.
- `base_url`: The URL where the ILIAS instance is located. (Required)
- `login_type`: How you authenticate. (Required)
- `local`: Use `client_id` for authentication.
- `shibboleth`: Use shibboleth for authentication.
- `client_id`: An ID used for authentication if `login_type` is `local`. Is
ignored if `login_type` is `shibboleth`.
- `target`: The ILIAS element to crawl. (Required)
- `desktop`: Crawl your personal desktop / dashboard
- `<course id>`: Crawl the course with the given id
- `<url>`: Crawl a given element by URL (preferably the permanent URL linked
at the bottom of its ILIAS page).
This also supports the "My Courses" overview page to download *all*
courses. Note that this might produce confusing local directory layouts
and duplication warnings if you are a member of an ILIAS group. The
`desktop` target is generally preferable.
- `auth`: Name of auth section to use for login. (Required)
- `tfa_auth`: Name of auth section to use for two-factor authentication. Only
uses the auth section's password. (Default: Anonymous `tfa` authenticator)
- `links`: How to represent external links. (Default: `fancy`)
- `ignore`: Don't download links.
- `plaintext`: A text file containing only the URL.
- `fancy`: A HTML file looking like the ILIAS link element.
- `internet-shortcut`: An internet shortcut file (`.url` file).
- `link_redirect_delay`: Time (in seconds) until `fancy` link files will
redirect to the actual URL. Set to a negative value to disable the automatic
redirect. (Default: `-1`)
- `videos`: Whether to download videos. (Default: `no`)
- `forums`: Whether to download forum threads. (Default: `no`)
- `http_timeout`: The timeout (in seconds) for all HTTP requests. (Default:
`20.0`)
### The `kit-ilias-web` crawler ### The `kit-ilias-web` crawler
This crawler crawls the KIT ILIAS instance. This crawler crawls the KIT ILIAS instance.
@ -226,10 +289,10 @@ is stored in the keyring.
### The `pass` authenticator ### The `pass` authenticator
This authenticator queries the [`pass` password manager][3] for a username and This authenticator queries the [`pass` password manager][pass] for a username
password. It tries to be mostly compatible with [browserpass][4] and and password. It tries to be mostly compatible with [browserpass][browserpass]
[passff][5], so see those links for an overview of the format. If PFERD fails and [passff][passff], so see those links for an overview of the format. If PFERD
to load your password, you can use the `--explain` flag to see why. fails to load your password, you can use the `--explain` flag to see why.
- `passname`: The name of the password to use (Required) - `passname`: The name of the password to use (Required)
- `username_prefixes`: A comma-separated list of username line prefixes - `username_prefixes`: A comma-separated list of username line prefixes
@ -237,9 +300,9 @@ to load your password, you can use the `--explain` flag to see why.
- `password_prefixes`: A comma-separated list of password line prefixes - `password_prefixes`: A comma-separated list of password line prefixes
(Default: `password,pass,secret`) (Default: `password,pass,secret`)
[3]: <https://www.passwordstore.org/> "Pass: The Standard Unix Password Manager" [pass]: <https://www.passwordstore.org/> "Pass: The Standard Unix Password Manager"
[4]: <https://github.com/browserpass/browserpass-extension#organizing-password-store> "Organizing password store" [browserpass]: <https://github.com/browserpass/browserpass-extension#organizing-password-store> "Organizing password store"
[5]: <https://github.com/passff/passff#multi-line-format> "Multi-line format" [passff]: <https://github.com/passff/passff#multi-line-format> "Multi-line format"
### The `tfa` authenticator ### The `tfa` authenticator
@ -338,7 +401,8 @@ matches `SOURCE`, the output path is created using `TARGET` as template.
be referred to as `{g<n>}` (e.g. `{g3}`). `{g0}` refers to the original path. be referred to as `{g<n>}` (e.g. `{g3}`). `{g0}` refers to the original path.
If capturing group *n*'s contents are a valid integer, the integer value is If capturing group *n*'s contents are a valid integer, the integer value is
available as `{i<n>}` (e.g. `{i3}`). If capturing group *n*'s contents are a available as `{i<n>}` (e.g. `{i3}`). If capturing group *n*'s contents are a
valid float, the float value is available as `{f<n>}` (e.g. `{f3}`). If a valid float, the float value is available as `{f<n>}` (e.g. `{f3}`). Named capture
groups (e.g. `(?P<name>)`) are available by their name (e.g. `{name}`). If a
capturing group is not present (e.g. when matching the string `cd` with the capturing group is not present (e.g. when matching the string `cd` with the
regex `(ab)?cd`), the corresponding variables are not defined. regex `(ab)?cd`), the corresponding variables are not defined.

View File

@ -1,5 +1,6 @@
Copyright 2019-2021 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, Copyright 2019-2024 Garmelon, I-Al-Istannen, danstooamerican, pavelzw,
TheChristophe, Scriptim, thelukasprobst, Toorero TheChristophe, Scriptim, thelukasprobst, Toorero,
Mr-Pine, p-fruck, PinieP
Permission is hereby granted, free of charge, to any person obtaining a copy of Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in this software and associated documentation files (the "Software"), to deal in

View File

@ -47,6 +47,8 @@ def configure_logging_from_args(args: argparse.Namespace) -> None:
log.output_explain = args.explain log.output_explain = args.explain
if args.status is not None: if args.status is not None:
log.output_status = args.status log.output_status = args.status
if args.show_not_deleted is not None:
log.output_not_deleted = args.show_not_deleted
if args.report is not None: if args.report is not None:
log.output_report = args.report log.output_report = args.report
@ -72,6 +74,8 @@ def configure_logging_from_config(args: argparse.Namespace, config: Config) -> N
log.output_status = config.default_section.status() log.output_status = config.default_section.status()
if args.report is None: if args.report is None:
log.output_report = config.default_section.report() log.output_report = config.default_section.report()
if args.show_not_deleted is None:
log.output_not_deleted = config.default_section.show_not_deleted()
except ConfigOptionError as e: except ConfigOptionError as e:
log.error(str(e)) log.error(str(e))
sys.exit(1) sys.exit(1)

View File

@ -8,6 +8,7 @@
# well. # well.
from . import command_local # noqa: F401 imported but unused from . import command_local # noqa: F401 imported but unused
from . import command_ilias_web # noqa: F401 imported but unused
from . import command_kit_ilias_web # noqa: F401 imported but unused from . import command_kit_ilias_web # noqa: F401 imported but unused
from . import command_kit_ipd # noqa: F401 imported but unused from . import command_kit_ipd # noqa: F401 imported but unused
from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused

View File

@ -0,0 +1,56 @@
import argparse
import configparser
from ..logging import log
from .common_ilias_args import configure_common_group_args, load_common
from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler
COMMAND_NAME = "ilias-web"
SUBPARSER = SUBPARSERS.add_parser(
COMMAND_NAME,
parents=[CRAWLER_PARSER],
)
GROUP = SUBPARSER.add_argument_group(
title=f"{COMMAND_NAME} crawler arguments",
description=f"arguments for the '{COMMAND_NAME}' crawler",
)
GROUP.add_argument(
"--base-url",
type=str,
metavar="BASE_URL",
help="The base url of the ilias instance"
)
GROUP.add_argument(
"--client-id",
type=str,
metavar="CLIENT_ID",
help="The client id of the ilias instance"
)
configure_common_group_args(GROUP)
def load(
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
log.explain(f"Creating config for command '{COMMAND_NAME}'")
parser["crawl:ilias"] = {}
section = parser["crawl:ilias"]
load_crawler(args, section)
section["type"] = COMMAND_NAME
if args.ilias_url is not None:
section["base_url"] = args.ilias_url
if args.client_id is not None:
section["client_id"] = args.client_id
load_common(section, args, parser)
SUBPARSER.set_defaults(command=load)

View File

@ -1,120 +1,37 @@
import argparse import argparse
import configparser import configparser
from pathlib import Path
from ..crawl.ilias.file_templates import Links
from ..logging import log from ..logging import log
from .parser import (CRAWLER_PARSER, SUBPARSERS, BooleanOptionalAction, ParserLoadError, load_crawler, from .common_ilias_args import configure_common_group_args, load_common
show_value_error) from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler
COMMAND_NAME = "kit-ilias-web"
SUBPARSER = SUBPARSERS.add_parser( SUBPARSER = SUBPARSERS.add_parser(
"kit-ilias-web", COMMAND_NAME,
parents=[CRAWLER_PARSER], parents=[CRAWLER_PARSER],
) )
GROUP = SUBPARSER.add_argument_group( GROUP = SUBPARSER.add_argument_group(
title="kit-ilias-web crawler arguments", title=f"{COMMAND_NAME} crawler arguments",
description="arguments for the 'kit-ilias-web' crawler", description=f"arguments for the '{COMMAND_NAME}' crawler",
)
GROUP.add_argument(
"target",
type=str,
metavar="TARGET",
help="course id, 'desktop', or ILIAS URL to crawl"
)
GROUP.add_argument(
"output",
type=Path,
metavar="OUTPUT",
help="output directory"
)
GROUP.add_argument(
"--username", "-u",
type=str,
metavar="USERNAME",
help="user name for authentication"
)
GROUP.add_argument(
"--keyring",
action=BooleanOptionalAction,
help="use the system keyring to store and retrieve passwords"
)
GROUP.add_argument(
"--credential-file",
type=Path,
metavar="PATH",
help="read username and password from a credential file"
)
GROUP.add_argument(
"--links",
type=show_value_error(Links.from_string),
metavar="OPTION",
help="how to represent external links"
)
GROUP.add_argument(
"--link-redirect-delay",
type=int,
metavar="SECONDS",
help="time before 'fancy' links redirect to to their target (-1 to disable)"
)
GROUP.add_argument(
"--videos",
action=BooleanOptionalAction,
help="crawl and download videos"
)
GROUP.add_argument(
"--forums",
action=BooleanOptionalAction,
help="crawl and download forum posts"
)
GROUP.add_argument(
"--http-timeout", "-t",
type=float,
metavar="SECONDS",
help="timeout for all HTTP requests"
) )
configure_common_group_args(GROUP)
def load( def load(
args: argparse.Namespace, args: argparse.Namespace,
parser: configparser.ConfigParser, parser: configparser.ConfigParser,
) -> None: ) -> None:
log.explain("Creating config for command 'kit-ilias-web'") log.explain(f"Creating config for command '{COMMAND_NAME}'")
parser["crawl:ilias"] = {} parser["crawl:ilias"] = {}
section = parser["crawl:ilias"] section = parser["crawl:ilias"]
load_crawler(args, section) load_crawler(args, section)
section["type"] = "kit-ilias-web" section["type"] = COMMAND_NAME
section["target"] = str(args.target) load_common(section, args, parser)
section["output_dir"] = str(args.output)
section["auth"] = "auth:ilias"
if args.links is not None:
section["links"] = str(args.links.value)
if args.link_redirect_delay is not None:
section["link_redirect_delay"] = str(args.link_redirect_delay)
if args.videos is not None:
section["videos"] = "yes" if args.videos else "no"
if args.forums is not None:
section["forums"] = "yes" if args.forums else "no"
if args.http_timeout is not None:
section["http_timeout"] = str(args.http_timeout)
parser["auth:ilias"] = {}
auth_section = parser["auth:ilias"]
if args.credential_file is not None:
if args.username is not None:
raise ParserLoadError("--credential-file and --username can't be used together")
if args.keyring:
raise ParserLoadError("--credential-file and --keyring can't be used together")
auth_section["type"] = "credential-file"
auth_section["path"] = str(args.credential_file)
elif args.keyring:
auth_section["type"] = "keyring"
else:
auth_section["type"] = "simple"
if args.username is not None:
auth_section["username"] = args.username
SUBPARSER.set_defaults(command=load) SUBPARSER.set_defaults(command=load)

View File

@ -0,0 +1,104 @@
import argparse
import configparser
from pathlib import Path
from ..crawl.ilias.file_templates import Links
from .parser import BooleanOptionalAction, ParserLoadError, show_value_error
def configure_common_group_args(group: argparse._ArgumentGroup) -> None:
"""These arguments are shared between the KIT and generic Ilias web command."""
group.add_argument(
"target",
type=str,
metavar="TARGET",
help="course id, 'desktop', or ILIAS URL to crawl"
)
group.add_argument(
"output",
type=Path,
metavar="OUTPUT",
help="output directory"
)
group.add_argument(
"--username", "-u",
type=str,
metavar="USERNAME",
help="user name for authentication"
)
group.add_argument(
"--keyring",
action=BooleanOptionalAction,
help="use the system keyring to store and retrieve passwords"
)
group.add_argument(
"--credential-file",
type=Path,
metavar="PATH",
help="read username and password from a credential file"
)
group.add_argument(
"--links",
type=show_value_error(Links.from_string),
metavar="OPTION",
help="how to represent external links"
)
group.add_argument(
"--link-redirect-delay",
type=int,
metavar="SECONDS",
help="time before 'fancy' links redirect to to their target (-1 to disable)"
)
group.add_argument(
"--videos",
action=BooleanOptionalAction,
help="crawl and download videos"
)
group.add_argument(
"--forums",
action=BooleanOptionalAction,
help="crawl and download forum posts"
)
group.add_argument(
"--http-timeout", "-t",
type=float,
metavar="SECONDS",
help="timeout for all HTTP requests"
)
def load_common(
section: configparser.SectionProxy,
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
"""Load common config between generic and KIT ilias web command"""
section["target"] = str(args.target)
section["output_dir"] = str(args.output)
section["auth"] = "auth:ilias"
if args.links is not None:
section["links"] = str(args.links.value)
if args.link_redirect_delay is not None:
section["link_redirect_delay"] = str(args.link_redirect_delay)
if args.videos is not None:
section["videos"] = "yes" if args.videos else "no"
if args.forums is not None:
section["forums"] = "yes" if args.forums else "no"
if args.http_timeout is not None:
section["http_timeout"] = str(args.http_timeout)
parser["auth:ilias"] = {}
auth_section = parser["auth:ilias"]
if args.credential_file is not None:
if args.username is not None:
raise ParserLoadError("--credential-file and --username can't be used together")
if args.keyring:
raise ParserLoadError("--credential-file and --keyring can't be used together")
auth_section["type"] = "credential-file"
auth_section["path"] = str(args.credential_file)
elif args.keyring:
auth_section["type"] = "keyring"
else:
auth_section["type"] = "simple"
if args.username is not None:
auth_section["username"] = args.username

View File

@ -215,6 +215,11 @@ PARSER.add_argument(
action=BooleanOptionalAction, action=BooleanOptionalAction,
help="whether crawlers should share cookies where applicable" help="whether crawlers should share cookies where applicable"
) )
PARSER.add_argument(
"--show-not-deleted",
action=BooleanOptionalAction,
help="print messages in status and report when PFERD did not delete a local only file"
)
def load_default_section( def load_default_section(
@ -233,6 +238,8 @@ def load_default_section(
section["report"] = "yes" if args.report else "no" section["report"] = "yes" if args.report else "no"
if args.share_cookies is not None: if args.share_cookies is not None:
section["share_cookies"] = "yes" if args.share_cookies else "no" section["share_cookies"] = "yes" if args.share_cookies else "no"
if args.show_not_deleted is not None:
section["show_not_deleted"] = "yes" if args.show_not_deleted else "no"
SUBPARSERS = PARSER.add_subparsers(title="crawlers") SUBPARSERS = PARSER.add_subparsers(title="crawlers")

View File

@ -82,6 +82,9 @@ class DefaultSection(Section):
def report(self) -> bool: def report(self) -> bool:
return self.s.getboolean("report", fallback=True) return self.s.getboolean("report", fallback=True)
def show_not_deleted(self) -> bool:
return self.s.getboolean("show_not_deleted", fallback=True)
def share_cookies(self) -> bool: def share_cookies(self) -> bool:
return self.s.getboolean("share_cookies", fallback=True) return self.s.getboolean("share_cookies", fallback=True)

View File

@ -4,7 +4,7 @@ from typing import Callable, Dict
from ..auth import Authenticator from ..auth import Authenticator
from ..config import Config from ..config import Config
from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401 from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401
from .ilias import KitIliasWebCrawler, KitIliasWebCrawlerSection from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, KitIliasWebCrawlerSection
from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection
from .local_crawler import LocalCrawler, LocalCrawlerSection from .local_crawler import LocalCrawler, LocalCrawlerSection
@ -18,6 +18,8 @@ CrawlerConstructor = Callable[[
CRAWLERS: Dict[str, CrawlerConstructor] = { CRAWLERS: Dict[str, CrawlerConstructor] = {
"local": lambda n, s, c, a: "local": lambda n, s, c, a:
LocalCrawler(n, LocalCrawlerSection(s), c), LocalCrawler(n, LocalCrawlerSection(s), c),
"ilias-web": lambda n, s, c, a:
IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a),
"kit-ilias-web": lambda n, s, c, a: "kit-ilias-web": lambda n, s, c, a:
KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a),
"kit-ipd": lambda n, s, c, a: "kit-ipd": lambda n, s, c, a:

View File

@ -258,6 +258,10 @@ class Crawler(ABC):
def prev_report(self) -> Optional[Report]: def prev_report(self) -> Optional[Report]:
return self._output_dir.prev_report return self._output_dir.prev_report
@property
def output_dir(self) -> OutputDirectory:
return self._output_dir
@staticmethod @staticmethod
async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]: async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]:
""" """
@ -293,6 +297,8 @@ class Crawler(ABC):
async def download( async def download(
self, self,
path: PurePath, path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None, mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None, redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None, on_conflict: Optional[OnConflict] = None,
@ -307,7 +313,14 @@ class Crawler(ABC):
log.status("[bold bright_black]", "Ignored", fmt_path(path)) log.status("[bold bright_black]", "Ignored", fmt_path(path))
return None return None
fs_token = await self._output_dir.download(path, transformed_path, mtime, redownload, on_conflict) fs_token = await self._output_dir.download(
path,
transformed_path,
etag_differs=etag_differs,
mtime=mtime,
redownload=redownload,
on_conflict=on_conflict
)
if fs_token is None: if fs_token is None:
log.explain("Answer: No") log.explain("Answer: No")
return None return None

View File

@ -1,12 +1,14 @@
import asyncio import asyncio
import http.cookies import http.cookies
import ssl import ssl
from datetime import datetime
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional, Tuple
import aiohttp import aiohttp
import certifi import certifi
from aiohttp.client import ClientTimeout from aiohttp.client import ClientTimeout
from bs4 import Tag
from ..auth import Authenticator from ..auth import Authenticator
from ..config import Config from ..config import Config
@ -15,6 +17,8 @@ from ..utils import fmt_real_path
from ..version import NAME, VERSION from ..version import NAME, VERSION
from .crawler import Crawler, CrawlerSection from .crawler import Crawler, CrawlerSection
ETAGS_CUSTOM_REPORT_VALUE_KEY = "etags"
class HttpCrawlerSection(CrawlerSection): class HttpCrawlerSection(CrawlerSection):
def http_timeout(self) -> float: def http_timeout(self) -> float:
@ -169,6 +173,78 @@ class HttpCrawler(Crawler):
log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}") log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}")
log.warn(str(e)) log.warn(str(e))
@staticmethod
def get_folder_structure_from_heading_hierarchy(file_link: Tag, drop_h1: bool = False) -> PurePath:
"""
Retrieves the hierarchy of headings associated with the give file link and constructs a folder
structure from them.
<h1> level headings usually only appear once and serve as the page title, so they would introduce
redundant nesting. To avoid this, <h1> headings are ignored via the drop_h1 parameter.
"""
def find_associated_headings(tag: Tag, level: int) -> PurePath:
if level == 0 or (level == 1 and drop_h1):
return PurePath()
level_heading = tag.find_previous(name=f"h{level}")
if level_heading is None:
return find_associated_headings(tag, level - 1)
folder_name = level_heading.getText().strip()
return find_associated_headings(level_heading, level - 1) / folder_name
# start at level <h3> because paragraph-level headings are usually too granular for folder names
return find_associated_headings(file_link, 3)
def _get_previous_etag_from_report(self, path: PurePath) -> Optional[str]:
"""
If available, retrieves the entity tag for a given path which was stored in the previous report.
"""
if not self._output_dir.prev_report:
return None
etags = self._output_dir.prev_report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {}
return etags.get(str(path))
def _add_etag_to_report(self, path: PurePath, etag: Optional[str]) -> None:
"""
Adds an entity tag for a given path to the report's custom values.
"""
if not etag:
return
etags = self._output_dir.report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {}
etags[str(path)] = etag
self._output_dir.report.add_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY, etags)
async def _request_resource_version(self, resource_url: str) -> Tuple[Optional[str], Optional[datetime]]:
"""
Requests the ETag and Last-Modified headers of a resource via a HEAD request.
If no entity tag / modification date can be obtained, the according value will be None.
"""
try:
async with self.session.head(resource_url) as resp:
if resp.status != 200:
return None, None
etag_header = resp.headers.get("ETag")
last_modified_header = resp.headers.get("Last-Modified")
if last_modified_header:
try:
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified#directives
datetime_format = "%a, %d %b %Y %H:%M:%S GMT"
last_modified = datetime.strptime(last_modified_header, datetime_format)
except ValueError:
# last_modified remains None
pass
return etag_header, last_modified
except aiohttp.ClientError:
return None, None
async def run(self) -> None: async def run(self) -> None:
self._request_count = 0 self._request_count = 0
self._cookie_jar = aiohttp.CookieJar() self._cookie_jar = aiohttp.CookieJar()
@ -186,7 +262,12 @@ class HttpCrawler(Crawler):
connect=self._http_timeout, connect=self._http_timeout,
sock_connect=self._http_timeout, sock_connect=self._http_timeout,
sock_read=self._http_timeout, sock_read=self._http_timeout,
) ),
# See https://github.com/aio-libs/aiohttp/issues/6626
# Without this aiohttp will mangle the redirect header from Shibboleth, invalidating the
# passed signature. Shibboleth will not accept the broken signature and authentication will
# fail.
requote_redirect_url=False
) as session: ) as session:
self.session = session self.session = session
try: try:

View File

@ -1,3 +1,9 @@
from .kit_ilias_web_crawler import KitIliasWebCrawler, KitIliasWebCrawlerSection from .kit_ilias_web_crawler import (IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler,
KitIliasWebCrawlerSection)
__all__ = ["KitIliasWebCrawler", "KitIliasWebCrawlerSection"] __all__ = [
"IliasWebCrawler",
"IliasWebCrawlerSection",
"KitIliasWebCrawler",
"KitIliasWebCrawlerSection",
]

View File

@ -0,0 +1,40 @@
import asyncio
from typing import Any, Callable, Optional
import aiohttp
from ...logging import log
from ..crawler import AWrapped, CrawlError, CrawlWarning
def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]:
def decorator(f: AWrapped) -> AWrapped:
async def wrapper(*args: Any, **kwargs: Any) -> Optional[Any]:
last_exception: Optional[BaseException] = None
for round in range(attempts):
try:
return await f(*args, **kwargs)
except aiohttp.ContentTypeError: # invalid content type
raise CrawlWarning("ILIAS returned an invalid content type")
except aiohttp.TooManyRedirects:
raise CrawlWarning("Got stuck in a redirect loop")
except aiohttp.ClientPayloadError as e: # encoding or not enough bytes
last_exception = e
except aiohttp.ClientConnectionError as e: # e.g. timeout, disconnect, resolve failed, etc.
last_exception = e
except asyncio.exceptions.TimeoutError as e: # explicit http timeouts in HttpCrawler
last_exception = e
log.explain_topic(f"Retrying operation {name}. Retries left: {attempts - 1 - round}")
log.explain(f"Last exception: {last_exception!r}")
if last_exception:
message = f"Error in I/O Operation: {last_exception!r}"
if failure_is_error:
raise CrawlError(message) from last_exception
else:
raise CrawlWarning(message) from last_exception
raise CrawlError("Impossible return in ilias _iorepeat")
return wrapper # type: ignore
return decorator

View File

@ -1,6 +1,10 @@
from enum import Enum from enum import Enum
from typing import Optional from typing import Optional
import bs4
from PFERD.utils import soupify
_link_template_plain = "{{link}}" _link_template_plain = "{{link}}"
_link_template_fancy = """ _link_template_fancy = """
<!DOCTYPE html> <!DOCTYPE html>
@ -94,6 +98,71 @@ _link_template_internet_shortcut = """
URL={{link}} URL={{link}}
""".strip() """.strip()
_learning_module_template = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>{{name}}</title>
</head>
<style>
* {
box-sizing: border-box;
}
.center-flex {
display: flex;
align-items: center;
justify-content: center;
}
.nav {
display: flex;
justify-content: space-between;
}
</style>
<body class="center-flex">
{{body}}
</body>
</html>
"""
def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str:
# Seems to be comments, ignore those.
for elem in body.select(".il-copg-mob-fullscreen-modal"):
elem.decompose()
nav_template = """
<div class="nav">
{{left}}
{{right}}
</div>
"""
if prev and body.select_one(".ilc_page_lnav_LeftNavigation"):
text = body.select_one(".ilc_page_lnav_LeftNavigation").getText().strip()
left = f'<a href="{prev}">{text}</a>'
else:
left = "<span></span>"
if next and body.select_one(".ilc_page_rnav_RightNavigation"):
text = body.select_one(".ilc_page_rnav_RightNavigation").getText().strip()
right = f'<a href="{next}">{text}</a>'
else:
right = "<span></span>"
if top_nav := body.select_one(".ilc_page_tnav_TopNavigation"):
top_nav.replace_with(
soupify(nav_template.replace("{{left}}", left).replace("{{right}}", right).encode())
)
if bot_nav := body.select_one(".ilc_page_bnav_BottomNavigation"):
bot_nav.replace_with(soupify(nav_template.replace(
"{{left}}", left).replace("{{right}}", right).encode())
)
body = body.prettify()
return _learning_module_template.replace("{{body}}", body).replace("{{name}}", name)
class Links(Enum): class Links(Enum):
IGNORE = "ignore" IGNORE = "ignore"
@ -102,24 +171,24 @@ class Links(Enum):
INTERNET_SHORTCUT = "internet-shortcut" INTERNET_SHORTCUT = "internet-shortcut"
def template(self) -> Optional[str]: def template(self) -> Optional[str]:
if self == self.FANCY: if self == Links.FANCY:
return _link_template_fancy return _link_template_fancy
elif self == self.PLAINTEXT: elif self == Links.PLAINTEXT:
return _link_template_plain return _link_template_plain
elif self == self.INTERNET_SHORTCUT: elif self == Links.INTERNET_SHORTCUT:
return _link_template_internet_shortcut return _link_template_internet_shortcut
elif self == self.IGNORE: elif self == Links.IGNORE:
return None return None
raise ValueError("Missing switch case") raise ValueError("Missing switch case")
def extension(self) -> Optional[str]: def extension(self) -> Optional[str]:
if self == self.FANCY: if self == Links.FANCY:
return ".html" return ".html"
elif self == self.PLAINTEXT: elif self == Links.PLAINTEXT:
return ".txt" return ".txt"
elif self == self.INTERNET_SHORTCUT: elif self == Links.INTERNET_SHORTCUT:
return ".url" return ".url"
elif self == self.IGNORE: elif self == Links.IGNORE:
return None return None
raise ValueError("Missing switch case") raise ValueError("Missing switch case")

View File

@ -12,6 +12,13 @@ _STYLE_TAG_CONTENT = """
font-weight: bold; font-weight: bold;
} }
.row-flex {
display: flex;
}
.row-flex-wrap {
flex-wrap: wrap;
}
.accordion-head { .accordion-head {
background-color: #f5f7fa; background-color: #f5f7fa;
padding: 0.5rem 0; padding: 0.5rem 0;
@ -82,9 +89,14 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup:
dummy.decompose() dummy.decompose()
if len(children) > 1: if len(children) > 1:
continue continue
if type(children[0]) == Comment: if isinstance(type(children[0]), Comment):
dummy.decompose() dummy.decompose()
# Delete video figures, as they can not be internalized anyway
for video in soup.select(".ilc_media_cont_MediaContainerHighlighted .ilPageVideo"):
if figure := video.find_parent("figure"):
figure.decompose()
for hrule_imposter in soup.find_all(class_="ilc_section_Separator"): for hrule_imposter in soup.find_all(class_="ilc_section_Separator"):
hrule_imposter.insert(0, soup.new_tag("hr")) hrule_imposter.insert(0, soup.new_tag("hr"))

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,7 @@ import re
from dataclasses import dataclass from dataclasses import dataclass
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from enum import Enum from enum import Enum
from typing import Dict, List, Optional, Union from typing import Dict, List, Optional, Union, cast
from urllib.parse import urljoin, urlparse from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
@ -15,20 +15,27 @@ TargetType = Union[str, int]
class IliasElementType(Enum): class IliasElementType(Enum):
BOOKING = "booking"
COURSE = "course"
EXERCISE = "exercise" EXERCISE = "exercise"
EXERCISE_FILES = "exercise_files" # own submitted files EXERCISE_FILES = "exercise_files" # own submitted files
TEST = "test" # an online test. Will be ignored currently.
FILE = "file" FILE = "file"
FOLDER = "folder" FOLDER = "folder"
FORUM = "forum" FORUM = "forum"
INFO_TAB = "info_tab"
LEARNING_MODULE = "learning_module"
LINK = "link" LINK = "link"
BOOKING = "booking" MEDIACAST_VIDEO = "mediacast_video"
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder"
MEETING = "meeting" MEETING = "meeting"
MOB_VIDEO = "mob_video"
OPENCAST_VIDEO = "opencast_video"
OPENCAST_VIDEO_FOLDER = "opencast_video_folder"
OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated"
OPENCAST_VIDEO_PLAYER = "opencast_video_player"
SCORM_LEARNING_MODULE = "scorm_learning_module"
SURVEY = "survey" SURVEY = "survey"
VIDEO = "video" TEST = "test" # an online test. Will be ignored currently.
VIDEO_PLAYER = "video_player"
VIDEO_FOLDER = "video_folder"
VIDEO_FOLDER_MAYBE_PAGINATED = "video_folder_maybe_paginated"
@dataclass @dataclass
@ -43,8 +50,13 @@ class IliasPageElement:
regexes = [ regexes = [
r"eid=(?P<id>[0-9a-z\-]+)", r"eid=(?P<id>[0-9a-z\-]+)",
r"file_(?P<id>\d+)", r"file_(?P<id>\d+)",
r"copa_(?P<id>\d+)",
r"fold_(?P<id>\d+)",
r"frm_(?P<id>\d+)",
r"exc_(?P<id>\d+)",
r"ref_id=(?P<id>\d+)", r"ref_id=(?P<id>\d+)",
r"target=[a-z]+_(?P<id>\d+)" r"target=[a-z]+_(?P<id>\d+)",
r"mm_(?P<id>\d+)"
] ]
for regex in regexes: for regex in regexes:
@ -55,6 +67,52 @@ class IliasPageElement:
log.warn(f"Didn't find identity for {self.name} - {self.url}. Please report this.") log.warn(f"Didn't find identity for {self.name} - {self.url}. Please report this.")
return self.url return self.url
@staticmethod
def create_new(
typ: IliasElementType,
url: str,
name: str,
mtime: Optional[datetime] = None,
description: Optional[str] = None,
skip_sanitize: bool = False
) -> 'IliasPageElement':
if typ == IliasElementType.MEETING:
normalized = IliasPageElement._normalize_meeting_name(name)
log.explain(f"Normalized meeting name from {name!r} to {normalized!r}")
name = normalized
if not skip_sanitize:
name = _sanitize_path_name(name)
return IliasPageElement(typ, url, name, mtime, description)
@staticmethod
def _normalize_meeting_name(meeting_name: str) -> str:
"""
Normalizes meeting names, which have a relative time as their first part,
to their date in ISO format.
"""
# This checks whether we can reach a `:` without passing a `-`
if re.search(r"^[^-]+: ", meeting_name):
# Meeting name only contains date: "05. Jan 2000:"
split_delimiter = ":"
else:
# Meeting name contains date and start/end times: "05. Jan 2000, 16:00 - 17:30:"
split_delimiter = ", "
# We have a meeting day without time
date_portion_str = meeting_name.split(split_delimiter)[0]
date_portion = demangle_date(date_portion_str)
# We failed to parse the date, bail out
if not date_portion:
return meeting_name
# Replace the first section with the absolute date
rest_of_name = split_delimiter.join(meeting_name.split(split_delimiter)[1:])
return datetime.strftime(date_portion, "%Y-%m-%d") + split_delimiter + rest_of_name
@dataclass @dataclass
class IliasDownloadForumData: class IliasDownloadForumData:
@ -71,6 +129,14 @@ class IliasForumThread:
mtime: Optional[datetime] mtime: Optional[datetime]
@dataclass
class IliasLearningModulePage:
title: str
content: Tag
next_url: Optional[str]
previous_url: Optional[str]
class IliasPage: class IliasPage:
def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]): def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]):
@ -79,6 +145,12 @@ class IliasPage:
self._page_type = source_element.type if source_element else None self._page_type = source_element.type if source_element else None
self._source_name = source_element.name if source_element else "" self._source_name = source_element.name if source_element else ""
@staticmethod
def is_root_page(soup: BeautifulSoup) -> bool:
if permalink := IliasPage.get_soup_permalink(soup):
return "goto.php?target=root_" in permalink
return False
def get_child_elements(self) -> List[IliasPageElement]: def get_child_elements(self) -> List[IliasPageElement]:
""" """
Return all child page elements you can find here. Return all child page elements you can find here.
@ -86,9 +158,9 @@ class IliasPage:
if self._is_video_player(): if self._is_video_player():
log.explain("Page is a video player, extracting URL") log.explain("Page is a video player, extracting URL")
return self._player_to_video() return self._player_to_video()
if self._is_video_listing(): if self._is_opencast_video_listing():
log.explain("Page is a video listing, searching for elements") log.explain("Page is an opencast video listing, searching for elements")
return self._find_video_entries() return self._find_opencast_video_entries()
if self._is_exercise_file(): if self._is_exercise_file():
log.explain("Page is an exercise, searching for elements") log.explain("Page is an exercise, searching for elements")
return self._find_exercise_entries() return self._find_exercise_entries()
@ -98,9 +170,25 @@ class IliasPage:
if self._is_content_page(): if self._is_content_page():
log.explain("Page is a content page, searching for elements") log.explain("Page is a content page, searching for elements")
return self._find_copa_entries() return self._find_copa_entries()
if self._is_info_tab():
log.explain("Page is info tab, searching for elements")
return self._find_info_tab_entries()
log.explain("Page is a normal folder, searching for elements") log.explain("Page is a normal folder, searching for elements")
return self._find_normal_entries() return self._find_normal_entries()
def get_info_tab(self) -> Optional[IliasPageElement]:
tab: Optional[Tag] = self._soup.find(
name="a",
attrs={"href": lambda x: x and "cmdClass=ilinfoscreengui" in x}
)
if tab is not None:
return IliasPageElement.create_new(
IliasElementType.INFO_TAB,
self._abs_url_from_link(tab),
"infos"
)
return None
def get_description(self) -> Optional[BeautifulSoup]: def get_description(self) -> Optional[BeautifulSoup]:
def is_interesting_class(name: str) -> bool: def is_interesting_class(name: str) -> bool:
return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"] return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"]
@ -126,6 +214,34 @@ class IliasPage:
return BeautifulSoup(raw_html, "html.parser") return BeautifulSoup(raw_html, "html.parser")
def get_learning_module_data(self) -> Optional[IliasLearningModulePage]:
if not self._is_learning_module_page():
return None
content = self._soup.select_one("#ilLMPageContent")
title = self._soup.select_one(".ilc_page_title_PageTitle").getText().strip()
return IliasLearningModulePage(
title=title,
content=content,
next_url=self._find_learning_module_next(),
previous_url=self._find_learning_module_prev()
)
def _find_learning_module_next(self) -> Optional[str]:
for link in self._soup.select("a.ilc_page_rnavlink_RightNavigationLink"):
url = self._abs_url_from_link(link)
if "baseClass=ilLMPresentationGUI" not in url:
continue
return url
return None
def _find_learning_module_prev(self) -> Optional[str]:
for link in self._soup.select("a.ilc_page_lnavlink_LeftNavigationLink"):
url = self._abs_url_from_link(link)
if "baseClass=ilLMPresentationGUI" not in url:
continue
return url
return None
def get_download_forum_data(self) -> Optional[IliasDownloadForumData]: def get_download_forum_data(self) -> Optional[IliasDownloadForumData]:
form = self._soup.find("form", attrs={"action": lambda x: x and "fallbackCmd=showThreads" in x}) form = self._soup.find("form", attrs={"action": lambda x: x and "fallbackCmd=showThreads" in x})
if not form: if not form:
@ -152,12 +268,18 @@ class IliasPage:
if self._is_ilias_opencast_embedding(): if self._is_ilias_opencast_embedding():
log.explain("Unwrapping opencast embedding") log.explain("Unwrapping opencast embedding")
return self.get_child_elements()[0] return self.get_child_elements()[0]
if self._page_type == IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED: if self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED:
log.explain("Unwrapping video pagination") log.explain("Unwrapping video pagination")
return self._find_video_entries_paginated()[0] return self._find_opencast_video_entries_paginated()[0]
if self._contains_collapsed_future_meetings(): if self._contains_collapsed_future_meetings():
log.explain("Requesting *all* future meetings") log.explain("Requesting *all* future meetings")
return self._uncollapse_future_meetings_url() return self._uncollapse_future_meetings_url()
if not self._is_content_tab_selected():
if self._page_type != IliasElementType.INFO_TAB:
log.explain("Selecting content tab")
return self._select_content_page_url()
else:
log.explain("Crawling info tab, skipping content select")
return None return None
def _is_forum_page(self) -> bool: def _is_forum_page(self) -> bool:
@ -170,7 +292,7 @@ class IliasPage:
def _is_video_player(self) -> bool: def _is_video_player(self) -> bool:
return "paella_config_file" in str(self._soup) return "paella_config_file" in str(self._soup)
def _is_video_listing(self) -> bool: def _is_opencast_video_listing(self) -> bool:
if self._is_ilias_opencast_embedding(): if self._is_ilias_opencast_embedding():
return True return True
@ -202,23 +324,58 @@ class IliasPage:
return False return False
def _is_personal_desktop(self) -> bool: def _is_personal_desktop(self) -> bool:
return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}) return "baseclass=ildashboardgui" in self._page_url.lower() and "&cmd=show" in self._page_url.lower()
def _is_content_page(self) -> bool: def _is_content_page(self) -> bool:
link = self._soup.find(id="current_perma_link") if link := self.get_permalink():
if not link: return "target=copa_" in link
return False
def _is_learning_module_page(self) -> bool:
if link := self.get_permalink():
return "target=pg_" in link
return False return False
return "target=copa_" in link.get("value")
def _contains_collapsed_future_meetings(self) -> bool: def _contains_collapsed_future_meetings(self) -> bool:
return self._uncollapse_future_meetings_url() is not None return self._uncollapse_future_meetings_url() is not None
def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]: def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]:
element = self._soup.find("a", attrs={"href": lambda x: x and "crs_next_sess=1" in x}) element = self._soup.find(
"a",
attrs={"href": lambda x: x and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)}
)
if not element: if not element:
return None return None
link = self._abs_url_from_link(element) link = self._abs_url_from_link(element)
return IliasPageElement(IliasElementType.FOLDER, link, "show all meetings") return IliasPageElement.create_new(IliasElementType.FOLDER, link, "show all meetings")
def _is_content_tab_selected(self) -> bool:
return self._select_content_page_url() is None
def _is_info_tab(self) -> bool:
might_be_info = self._soup.find("form", attrs={"name": lambda x: x == "formInfoScreen"}) is not None
return self._page_type == IliasElementType.INFO_TAB and might_be_info
def _is_course_overview_page(self) -> bool:
return "baseClass=ilmembershipoverviewgui" in self._page_url
def _select_content_page_url(self) -> Optional[IliasPageElement]:
tab = self._soup.find(
id="tab_view_content",
attrs={"class": lambda x: x is not None and "active" not in x}
)
# Already selected (or not found)
if not tab:
return None
link = tab.find("a")
if link:
link = self._abs_url_from_link(link)
return IliasPageElement.create_new(IliasElementType.FOLDER, link, "select content page")
_unexpected_html_warning()
log.warn_contd(f"Could not find content tab URL on {self._page_url!r}.")
log.warn_contd("PFERD might not find content on the course's main page.")
return None
def _player_to_video(self) -> List[IliasPageElement]: def _player_to_video(self) -> List[IliasPageElement]:
# Fetch the actual video page. This is a small wrapper page initializing a javscript # Fetch the actual video page. This is a small wrapper page initializing a javscript
@ -243,14 +400,16 @@ class IliasPage:
# and just fetch the lone video url! # and just fetch the lone video url!
if len(streams) == 1: if len(streams) == 1:
video_url = streams[0]["sources"]["mp4"][0]["src"] video_url = streams[0]["sources"]["mp4"][0]["src"]
return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)] return [
IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO, video_url, self._source_name)
]
log.explain(f"Found multiple videos for stream at {self._source_name}") log.explain(f"Found multiple videos for stream at {self._source_name}")
items = [] items = []
for stream in sorted(streams, key=lambda stream: stream["content"]): for stream in sorted(streams, key=lambda stream: stream["content"]):
full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4" full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4"
video_url = stream["sources"]["mp4"][0]["src"] video_url = stream["sources"]["mp4"][0]["src"]
items.append(IliasPageElement(IliasElementType.VIDEO, video_url, full_name)) items.append(IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO, video_url, full_name))
return items return items
@ -265,17 +424,26 @@ class IliasPage:
link = self._abs_url_from_link(correct_link) link = self._abs_url_from_link(correct_link)
return IliasPageElement(IliasElementType.FORUM, link, "show all forum threads") return IliasPageElement.create_new(IliasElementType.FORUM, link, "show all forum threads")
def _find_personal_desktop_entries(self) -> List[IliasPageElement]: def _find_personal_desktop_entries(self) -> List[IliasPageElement]:
items: List[IliasPageElement] = [] items: List[IliasPageElement] = []
titles: List[Tag] = self._soup.select(".il-item-title") titles: List[Tag] = self._soup.select("#block_pditems_0 .il-item-title")
for title in titles: for title in titles:
link = title.find("a") link = title.find("a")
if not link:
log.explain(f"Skipping offline item: {title.getText().strip()!r}")
continue
name = _sanitize_path_name(link.text.strip()) name = _sanitize_path_name(link.text.strip())
url = self._abs_url_from_link(link) url = self._abs_url_from_link(link)
if "cmd=manage" in url and "cmdClass=ilPDSelectedItemsBlockGUI" in url:
# Configure button/link does not have anything interesting
continue
type = self._find_type_from_link(name, link, url) type = self._find_type_from_link(name, link, url)
if not type: if not type:
_unexpected_html_warning() _unexpected_html_warning()
@ -288,7 +456,7 @@ class IliasPage:
url = re.sub(r"(target=file_\d+)", r"\1_download", url) url = re.sub(r"(target=file_\d+)", r"\1_download", url)
log.explain("Rewired file URL to include download part") log.explain("Rewired file URL to include download part")
items.append(IliasPageElement(type, url, name)) items.append(IliasPageElement.create_new(type, url, name))
return items return items
@ -298,18 +466,36 @@ class IliasPage:
for link in links: for link in links:
url = self._abs_url_from_link(link) url = self._abs_url_from_link(link)
name = _sanitize_path_name(link.getText().strip().replace("\t", "")) name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.getText()).strip().replace("\t", "")
name = _sanitize_path_name(name)
if "file_id" not in url: if "file_id" not in url:
_unexpected_html_warning() _unexpected_html_warning()
log.warn_contd(f"Found unknown content page item {name!r} with url {url!r}") log.warn_contd(f"Found unknown content page item {name!r} with url {url!r}")
continue continue
items.append(IliasPageElement(IliasElementType.FILE, url, name)) items.append(IliasPageElement.create_new(IliasElementType.FILE, url, name))
return items return items
def _find_video_entries(self) -> List[IliasPageElement]: def _find_info_tab_entries(self) -> List[IliasPageElement]:
items = []
links: List[Tag] = self._soup.select("a.il_ContainerItemCommand")
for link in links:
if "cmdClass=ilobjcoursegui" not in link["href"]:
continue
if "cmd=sendfile" not in link["href"]:
continue
items.append(IliasPageElement.create_new(
IliasElementType.FILE,
self._abs_url_from_link(link),
_sanitize_path_name(link.getText())
))
return items
def _find_opencast_video_entries(self) -> List[IliasPageElement]:
# ILIAS has three stages for video pages # ILIAS has three stages for video pages
# 1. The initial dummy page without any videos. This page contains the link to the listing # 1. The initial dummy page without any videos. This page contains the link to the listing
# 2. The video listing which might be paginated # 2. The video listing which might be paginated
@ -329,27 +515,29 @@ class IliasPage:
query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
url = url_set_query_params(url, query_params) url = url_set_query_params(url, query_params)
log.explain("Found ILIAS video frame page, fetching actual content next") log.explain("Found ILIAS video frame page, fetching actual content next")
return [IliasPageElement(IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED, url, "")] return [
IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, url, "")
]
is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None
if is_paginated and not self._page_type == IliasElementType.VIDEO_FOLDER: if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER:
# We are in stage 2 - try to break pagination # We are in stage 2 - try to break pagination
return self._find_video_entries_paginated() return self._find_opencast_video_entries_paginated()
return self._find_video_entries_no_paging() return self._find_opencast_video_entries_no_paging()
def _find_video_entries_paginated(self) -> List[IliasPageElement]: def _find_opencast_video_entries_paginated(self) -> List[IliasPageElement]:
table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+"))
if table_element is None: if table_element is None:
log.warn("Couldn't increase elements per page (table not found). I might miss elements.") log.warn("Couldn't increase elements per page (table not found). I might miss elements.")
return self._find_video_entries_no_paging() return self._find_opencast_video_entries_no_paging()
id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"]) id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"])
if id_match is None: if id_match is None:
log.warn("Couldn't increase elements per page (table id not found). I might miss elements.") log.warn("Couldn't increase elements per page (table id not found). I might miss elements.")
return self._find_video_entries_no_paging() return self._find_opencast_video_entries_no_paging()
table_id = id_match.group(1) table_id = id_match.group(1)
@ -358,9 +546,9 @@ class IliasPage:
url = url_set_query_params(self._page_url, query_params) url = url_set_query_params(self._page_url, query_params)
log.explain("Disabled pagination, retrying folder as a new entry") log.explain("Disabled pagination, retrying folder as a new entry")
return [IliasPageElement(IliasElementType.VIDEO_FOLDER, url, "")] return [IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")]
def _find_video_entries_no_paging(self) -> List[IliasPageElement]: def _find_opencast_video_entries_no_paging(self) -> List[IliasPageElement]:
""" """
Crawls the "second stage" video page. This page contains the actual video urls. Crawls the "second stage" video page. This page contains the actual video urls.
""" """
@ -372,11 +560,11 @@ class IliasPage:
results: List[IliasPageElement] = [] results: List[IliasPageElement] = []
for link in video_links: for link in video_links:
results.append(self._listed_video_to_element(link)) results.append(self._listed_opencast_video_to_element(link))
return results return results
def _listed_video_to_element(self, link: Tag) -> IliasPageElement: def _listed_opencast_video_to_element(self, link: Tag) -> IliasPageElement:
# The link is part of a table with multiple columns, describing metadata. # The link is part of a table with multiple columns, describing metadata.
# 6th or 7th child (1 indexed) is the modification time string. Try to find it # 6th or 7th child (1 indexed) is the modification time string. Try to find it
# by parsing backwards from the end and finding something that looks like a date # by parsing backwards from the end and finding something that looks like a date
@ -387,8 +575,8 @@ class IliasPage:
modification_string = link.parent.parent.parent.select_one( modification_string = link.parent.parent.parent.select_one(
f"td.std:nth-child({index})" f"td.std:nth-child({index})"
).getText().strip() ).getText().strip()
if re.search(r"\d+\.\d+.\d+ - \d+:\d+", modification_string): if match := re.search(r"\d+\.\d+.\d+ \d+:\d+", modification_string):
modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M") modification_time = datetime.strptime(match.group(0), "%d.%m.%Y %H:%M")
break break
if modification_time is None: if modification_time is None:
@ -403,7 +591,9 @@ class IliasPage:
video_url = self._abs_url_from_link(link) video_url = self._abs_url_from_link(link)
log.explain(f"Found video {video_name!r} at {video_url}") log.explain(f"Found video {video_name!r} at {video_url}")
return IliasPageElement(IliasElementType.VIDEO_PLAYER, video_url, video_name, modification_time) return IliasPageElement.create_new(
IliasElementType.OPENCAST_VIDEO_PLAYER, video_url, video_name, modification_time
)
def _find_exercise_entries(self) -> List[IliasPageElement]: def _find_exercise_entries(self) -> List[IliasPageElement]:
if self._soup.find(id="tab_submission"): if self._soup.find(id="tab_submission"):
@ -437,7 +627,7 @@ class IliasPage:
if date is None: if date is None:
log.warn(f"Date parsing failed for exercise entry {name!r}") log.warn(f"Date parsing failed for exercise entry {name!r}")
results.append(IliasPageElement( results.append(IliasPageElement.create_new(
IliasElementType.FILE, IliasElementType.FILE,
self._abs_url_from_link(link), self._abs_url_from_link(link),
name, name,
@ -470,22 +660,22 @@ class IliasPage:
# Two divs, side by side. Left is the name, right is the link ==> get left # Two divs, side by side. Left is the name, right is the link ==> get left
# sibling # sibling
file_name = file_link.parent.findPrevious(name="div").getText().strip() file_name = file_link.parent.findPrevious(name="div").getText().strip()
file_name = _sanitize_path_name(file_name)
url = self._abs_url_from_link(file_link) url = self._abs_url_from_link(file_link)
log.explain(f"Found exercise entry {file_name!r}") log.explain(f"Found exercise entry {file_name!r}")
results.append(IliasPageElement( results.append(IliasPageElement.create_new(
IliasElementType.FILE, IliasElementType.FILE,
url, url,
container_name + "/" + file_name, _sanitize_path_name(container_name) + "/" + _sanitize_path_name(file_name),
None # We do not have any timestamp mtime=None, # We do not have any timestamp
skip_sanitize=True
)) ))
# Find all links to file listings (e.g. "Submitted Files" for groups) # Find all links to file listings (e.g. "Submitted Files" for groups)
file_listings: List[Tag] = container.findAll( file_listings: List[Tag] = container.findAll(
name="a", name="a",
# download links contain the given command class # download links contain the given command class
attrs={"href": lambda x: x and "cmdClass=ilexsubmissionfilegui" in x} attrs={"href": lambda x: x and "cmdclass=ilexsubmissionfilegui" in x.lower()}
) )
# Add each listing as a new # Add each listing as a new
@ -496,14 +686,15 @@ class IliasPage:
label_container: Tag = parent_container.find( label_container: Tag = parent_container.find(
attrs={"class": lambda x: x and "control-label" in x} attrs={"class": lambda x: x and "control-label" in x}
) )
file_name = _sanitize_path_name(label_container.getText().strip()) file_name = label_container.getText().strip()
url = self._abs_url_from_link(listing) url = self._abs_url_from_link(listing)
log.explain(f"Found exercise detail {file_name!r} at {url}") log.explain(f"Found exercise detail {file_name!r} at {url}")
results.append(IliasPageElement( results.append(IliasPageElement.create_new(
IliasElementType.EXERCISE_FILES, IliasElementType.EXERCISE_FILES,
url, url,
container_name + "/" + file_name, _sanitize_path_name(container_name) + "/" + _sanitize_path_name(file_name),
None # we do not have any timestamp None, # we do not have any timestamp
skip_sanitize=True
)) ))
return results return results
@ -511,12 +702,18 @@ class IliasPage:
def _find_normal_entries(self) -> List[IliasPageElement]: def _find_normal_entries(self) -> List[IliasPageElement]:
result: List[IliasPageElement] = [] result: List[IliasPageElement] = []
links: List[Tag] = []
# Fetch all links and throw them to the general interpreter # Fetch all links and throw them to the general interpreter
links: List[Tag] = self._soup.select("a.il_ContainerItemTitle") if self._is_course_overview_page():
log.explain("Page is a course overview page, adjusting link selector")
links.extend(self._soup.select(".il-item-title > a"))
else:
links.extend(self._soup.select("a.il_ContainerItemTitle"))
for link in links: for link in links:
abs_url = self._abs_url_from_link(link) abs_url = self._abs_url_from_link(link)
parents = self._find_upwards_folder_hierarchy(link) # Make sure parents are sanitized. We do not want accidental parents
parents = [_sanitize_path_name(x) for x in self._find_upwards_folder_hierarchy(link)]
if parents: if parents:
element_name = "/".join(parents) + "/" + _sanitize_path_name(link.getText()) element_name = "/".join(parents) + "/" + _sanitize_path_name(link.getText())
@ -534,21 +731,94 @@ class IliasPage:
if not element_type: if not element_type:
continue continue
if element_type == IliasElementType.MEETING:
normalized = _sanitize_path_name(self._normalize_meeting_name(element_name))
log.explain(f"Normalized meeting name from {element_name!r} to {normalized!r}")
element_name = normalized
elif element_type == IliasElementType.FILE: elif element_type == IliasElementType.FILE:
result.append(self._file_to_element(element_name, abs_url, link)) result.append(self._file_to_element(element_name, abs_url, link))
continue continue
log.explain(f"Found {element_name!r}") log.explain(f"Found {element_name!r}")
result.append(IliasPageElement(element_type, abs_url, element_name, description=description)) result.append(IliasPageElement.create_new(
element_type,
abs_url,
element_name,
description=description,
skip_sanitize=True
))
result += self._find_cards() result += self._find_cards()
result += self._find_mediacast_videos()
result += self._find_mob_videos()
return result return result
def _find_mediacast_videos(self) -> List[IliasPageElement]:
videos: List[IliasPageElement] = []
for elem in cast(List[Tag], self._soup.select(".ilPlayerPreviewOverlayOuter")):
element_name = _sanitize_path_name(
elem.select_one(".ilPlayerPreviewDescription").getText().strip()
)
if not element_name.endswith(".mp4"):
# just to make sure it has some kinda-alrightish ending
element_name = element_name + ".mp4"
video_element = elem.find(name="video")
if not video_element:
_unexpected_html_warning()
log.warn_contd(f"No <video> element found for mediacast video '{element_name}'")
continue
videos.append(IliasPageElement.create_new(
typ=IliasElementType.MEDIACAST_VIDEO,
url=self._abs_url_from_relative(video_element.get("src")),
name=element_name,
mtime=self._find_mediacast_video_mtime(elem.findParent(name="td"))
))
return videos
def _find_mob_videos(self) -> List[IliasPageElement]:
videos: List[IliasPageElement] = []
for figure in self._soup.select("figure.ilc_media_cont_MediaContainerHighlighted"):
title = figure.select_one("figcaption").getText().strip() + ".mp4"
video_element = figure.select_one("video")
if not video_element:
_unexpected_html_warning()
log.warn_contd(f"No <video> element found for mob video '{title}'")
continue
url = None
for source in video_element.select("source"):
if source.get("type", "") == "video/mp4":
url = source.get("src")
break
if url is None:
_unexpected_html_warning()
log.warn_contd(f"No <source> element found for mob video '{title}'")
continue
videos.append(IliasPageElement.create_new(
typ=IliasElementType.MOB_VIDEO,
url=self._abs_url_from_relative(url),
name=_sanitize_path_name(title),
mtime=None
))
return videos
def _find_mediacast_video_mtime(self, enclosing_td: Tag) -> Optional[datetime]:
description_td: Tag = enclosing_td.findPreviousSibling("td")
if not description_td:
return None
meta_tag: Tag = description_td.find_all("p")[-1]
if not meta_tag:
return None
updated_str = meta_tag.getText().strip().replace("\n", " ")
updated_str = re.sub(".+?: ", "", updated_str)
return demangle_date(updated_str)
def _is_in_expanded_meeting(self, tag: Tag) -> bool: def _is_in_expanded_meeting(self, tag: Tag) -> bool:
""" """
Returns whether a file is part of an expanded meeting. Returns whether a file is part of an expanded meeting.
@ -586,11 +856,14 @@ class IliasPage:
# ILIAS has proper accordions and weird blocks that look like normal headings, # ILIAS has proper accordions and weird blocks that look like normal headings,
# but some JS later transforms them into an accordion. # but some JS later transforms them into an accordion.
# This is for these weird JS-y blocks # This is for these weird JS-y blocks and custom item groups
if "ilContainerItemsContainer" in parent.get("class"): if "ilContainerItemsContainer" in parent.get("class"):
data_store_url = parent.parent.get("data-store-url", "").lower()
is_custom_item_group = "baseclass=ilcontainerblockpropertiesstoragegui" in data_store_url \
and "cont_block_id=" in data_store_url
# I am currently under the impression that *only* those JS blocks have an # I am currently under the impression that *only* those JS blocks have an
# ilNoDisplay class. # ilNoDisplay class.
if "ilNoDisplay" not in parent.get("class"): if not is_custom_item_group and "ilNoDisplay" not in parent.get("class"):
continue continue
prev: Tag = parent.findPreviousSibling("div") prev: Tag = parent.findPreviousSibling("div")
if "ilContainerBlockHeader" in prev.get("class"): if "ilContainerBlockHeader" in prev.get("class"):
@ -650,7 +923,9 @@ class IliasPage:
full_path = name + "." + file_type full_path = name + "." + file_type
log.explain(f"Found file {full_path!r}") log.explain(f"Found file {full_path!r}")
return IliasPageElement(IliasElementType.FILE, url, full_path, modification_date) return IliasPageElement.create_new(
IliasElementType.FILE, url, full_path, modification_date, skip_sanitize=True
)
def _find_cards(self) -> List[IliasPageElement]: def _find_cards(self) -> List[IliasPageElement]:
result: List[IliasPageElement] = [] result: List[IliasPageElement] = []
@ -667,7 +942,7 @@ class IliasPage:
log.warn_contd(f"Could not extract type for {title}") log.warn_contd(f"Could not extract type for {title}")
continue continue
result.append(IliasPageElement(type, url, name)) result.append(IliasPageElement.create_new(type, url, name))
card_button_tiles: List[Tag] = self._soup.select(".card-title button") card_button_tiles: List[Tag] = self._soup.select(".card-title button")
@ -685,14 +960,18 @@ class IliasPage:
"div", "div",
attrs={"class": lambda x: x and "caption" in x}, attrs={"class": lambda x: x and "caption" in x},
) )
description = caption_parent.find_next_sibling("div").getText().strip() caption_container = caption_parent.find_next_sibling("div")
if caption_container:
description = caption_container.getText().strip()
else:
description = None
if not type: if not type:
_unexpected_html_warning() _unexpected_html_warning()
log.warn_contd(f"Could not extract type for {button}") log.warn_contd(f"Could not extract type for {button}")
continue continue
result.append(IliasPageElement(type, url, name, description=description)) result.append(IliasPageElement.create_new(type, url, name, description=description))
return result return result
@ -715,14 +994,18 @@ class IliasPage:
icon: Tag = card_root.select_one(".il-card-repository-head .icon") icon: Tag = card_root.select_one(".il-card-repository-head .icon")
if "opencast" in icon["class"]: if "opencast" in icon["class"] or "xoct" in icon["class"]:
return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
if "exc" in icon["class"]: if "exc" in icon["class"]:
return IliasElementType.EXERCISE return IliasElementType.EXERCISE
if "grp" in icon["class"]:
return IliasElementType.FOLDER
if "webr" in icon["class"]: if "webr" in icon["class"]:
return IliasElementType.LINK return IliasElementType.LINK
if "book" in icon["class"]: if "book" in icon["class"]:
return IliasElementType.BOOKING return IliasElementType.BOOKING
if "crsr" in icon["class"]:
return IliasElementType.COURSE
if "frm" in icon["class"]: if "frm" in icon["class"]:
return IliasElementType.FORUM return IliasElementType.FORUM
if "sess" in icon["class"]: if "sess" in icon["class"]:
@ -735,6 +1018,10 @@ class IliasPage:
return IliasElementType.FOLDER return IliasElementType.FOLDER
if "svy" in icon["class"]: if "svy" in icon["class"]:
return IliasElementType.SURVEY return IliasElementType.SURVEY
if "file" in icon["class"]:
return IliasElementType.FILE
if "mcst" in icon["class"]:
return IliasElementType.MEDIACAST_VIDEO_FOLDER
_unexpected_html_warning() _unexpected_html_warning()
log.warn_contd(f"Could not extract type from {icon} for card title {card_title}") log.warn_contd(f"Could not extract type from {icon} for card title {card_title}")
@ -773,6 +1060,28 @@ class IliasPage:
if "cmdClass=ilobjtestgui" in parsed_url.query: if "cmdClass=ilobjtestgui" in parsed_url.query:
return IliasElementType.TEST return IliasElementType.TEST
if "baseClass=ilLMPresentationGUI" in parsed_url.query:
return IliasElementType.LEARNING_MODULE
if "baseClass=ilMediaCastHandlerGUI" in parsed_url.query:
return IliasElementType.MEDIACAST_VIDEO_FOLDER
if "baseClass=ilSAHSPresentationGUI" in parsed_url.query:
return IliasElementType.SCORM_LEARNING_MODULE
# other universities might have content type specified in URL path
if "_file_" in parsed_url.path:
return IliasElementType.FILE
if "_fold_" in parsed_url.path or "_copa_" in parsed_url.path:
return IliasElementType.FOLDER
if "_frm_" in parsed_url.path:
return IliasElementType.FORUM
if "_exc_" in parsed_url.path:
return IliasElementType.EXERCISE
# Booking and Meeting can not be detected based on the link. They do have a ref_id though, so # Booking and Meeting can not be detected based on the link. They do have a ref_id though, so
# try to guess it from the image. # try to guess it from the image.
@ -814,7 +1123,11 @@ class IliasPage:
if img_tag is None: if img_tag is None:
img_tag = found_parent.select_one("img.icon") img_tag = found_parent.select_one("img.icon")
if img_tag is None and found_parent.find("a", attrs={"href": lambda x: x and "crs_next_sess=" in x}): is_session_expansion_button = found_parent.find(
"a",
attrs={"href": lambda x: x and ("crs_next_sess=" in x or "crs_prev_sess=" in x)}
)
if img_tag is None and is_session_expansion_button:
log.explain("Found session expansion button, skipping it as it has no content") log.explain("Found session expansion button, skipping it as it has no content")
return None return None
@ -824,7 +1137,7 @@ class IliasPage:
return None return None
if "opencast" in str(img_tag["alt"]).lower(): if "opencast" in str(img_tag["alt"]).lower():
return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
if str(img_tag["src"]).endswith("icon_exc.svg"): if str(img_tag["src"]).endswith("icon_exc.svg"):
return IliasElementType.EXERCISE return IliasElementType.EXERCISE
@ -844,34 +1157,52 @@ class IliasPage:
if str(img_tag["src"]).endswith("icon_tst.svg"): if str(img_tag["src"]).endswith("icon_tst.svg"):
return IliasElementType.TEST return IliasElementType.TEST
if str(img_tag["src"]).endswith("icon_mcst.svg"):
return IliasElementType.MEDIACAST_VIDEO_FOLDER
if str(img_tag["src"]).endswith("icon_sahs.svg"):
return IliasElementType.SCORM_LEARNING_MODULE
return IliasElementType.FOLDER return IliasElementType.FOLDER
@staticmethod @staticmethod
def _normalize_meeting_name(meeting_name: str) -> str: def is_logged_in(soup: BeautifulSoup) -> bool:
""" # Normal ILIAS pages
Normalizes meeting names, which have a relative time as their first part, mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
to their date in ISO format. if mainbar is not None:
""" login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# This checks whether we can reach a `:` without passing a `-` # Personal Desktop
if re.search(r"^[^-]+: ", meeting_name): if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
# Meeting name only contains date: "05. Jan 2000:" return True
split_delimiter = ":"
else:
# Meeting name contains date and start/end times: "05. Jan 2000, 16:00 - 17:30:"
split_delimiter = ", "
# We have a meeting day without time # Empty personal desktop has zero (0) markers. Match on the text...
date_portion_str = meeting_name.split(split_delimiter)[0] if alert := soup.select_one(".alert-info"):
date_portion = demangle_date(date_portion_str) text = alert.getText().lower()
if "you have not yet selected any favourites" in text:
return True
if "sie haben aktuell noch keine favoriten ausgewählt" in text:
return True
# We failed to parse the date, bail out # Video listing embeds do not have complete ILIAS html. Try to match them by
if not date_portion: # their video listing table
return meeting_name video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False
# Replace the first section with the absolute date def get_permalink(self) -> Optional[str]:
rest_of_name = split_delimiter.join(meeting_name.split(split_delimiter)[1:]) return IliasPage.get_soup_permalink(self._soup)
return datetime.strftime(date_portion, "%Y-%m-%d") + split_delimiter + rest_of_name
def _abs_url_from_link(self, link_tag: Tag) -> str: def _abs_url_from_link(self, link_tag: Tag) -> str:
""" """
@ -885,6 +1216,13 @@ class IliasPage:
""" """
return urljoin(self._page_url, relative_url) return urljoin(self._page_url, relative_url)
@staticmethod
def get_soup_permalink(soup: BeautifulSoup) -> Optional[str]:
perma_link_element: Tag = soup.select_one(".il-footer-permanent-url > a")
if not perma_link_element or not perma_link_element.get("href"):
return None
return perma_link_element.get("href")
def _unexpected_html_warning() -> None: def _unexpected_html_warning() -> None:
log.warn("Encountered unexpected HTML structure, ignoring element.") log.warn("Encountered unexpected HTML structure, ignoring element.")

View File

@ -1,982 +1,37 @@
import asyncio from typing import Dict, Literal
import re
from collections.abc import Awaitable, Coroutine
from pathlib import PurePath
from typing import Any, Callable, Dict, List, Optional, Set, Union, cast
import aiohttp from ...auth import Authenticator
import yarl
from aiohttp import hdrs
from bs4 import BeautifulSoup, Tag
from ...auth import Authenticator, TfaAuthenticator
from ...config import Config from ...config import Config
from ...logging import ProgressBar, log from .ilias_web_crawler import IliasWebCrawler, IliasWebCrawlerSection
from ...output_dir import FileSink, Redownload from .shibboleth_login import ShibbolethLogin
from ...utils import fmt_path, soupify, url_set_query_param
from ..crawler import AWrapped, CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical
from ..http_crawler import HttpCrawler, HttpCrawlerSection
from .file_templates import Links
from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasPage, IliasPageElement,
_sanitize_path_name, parse_ilias_forum_export)
TargetType = Union[str, int]
_ILIAS_URL = "https://ilias.studium.kit.edu" _ILIAS_URL = "https://ilias.studium.kit.edu"
class KitShibbolethBackgroundLoginSuccessful(): class KitShibbolethBackgroundLoginSuccessful:
pass pass
class KitIliasWebCrawlerSection(HttpCrawlerSection): class KitIliasWebCrawlerSection(IliasWebCrawlerSection):
def target(self) -> TargetType: def base_url(self) -> str:
target = self.s.get("target") return _ILIAS_URL
if not target:
self.missing_value("target")
if re.fullmatch(r"\d+", target): def login(self) -> Literal["shibboleth"]:
# Course id return "shibboleth"
return int(target)
if target == "desktop":
# Full personal desktop
return target
if target.startswith(_ILIAS_URL):
# ILIAS URL
return target
self.invalid_value("target", target, "Should be <course id | desktop | kit ilias URL>")
def tfa_auth(self, authenticators: Dict[str, Authenticator]) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
auth = authenticators.get(value)
if auth is None:
self.invalid_value("tfa_auth", value, "No such auth section exists")
return auth
def links(self) -> Links:
type_str: Optional[str] = self.s.get("links")
if type_str is None:
return Links.FANCY
try:
return Links.from_string(type_str)
except ValueError as e:
self.invalid_value("links", type_str, str(e).capitalize())
def link_redirect_delay(self) -> int:
return self.s.getint("link_redirect_delay", fallback=-1)
def videos(self) -> bool:
return self.s.getboolean("videos", fallback=False)
def forums(self) -> bool:
return self.s.getboolean("forums", fallback=False)
_DIRECTORY_PAGES: Set[IliasElementType] = set([ class KitIliasWebCrawler(IliasWebCrawler):
IliasElementType.EXERCISE,
IliasElementType.EXERCISE_FILES,
IliasElementType.FOLDER,
IliasElementType.MEETING,
IliasElementType.VIDEO_FOLDER,
IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED,
])
_VIDEO_ELEMENTS: Set[IliasElementType] = set([
IliasElementType.VIDEO,
IliasElementType.VIDEO_PLAYER,
IliasElementType.VIDEO_FOLDER,
IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED,
])
def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]:
def decorator(f: AWrapped) -> AWrapped:
async def wrapper(*args: Any, **kwargs: Any) -> Optional[Any]:
last_exception: Optional[BaseException] = None
for round in range(attempts):
try:
return await f(*args, **kwargs)
except aiohttp.ContentTypeError: # invalid content type
raise CrawlWarning("ILIAS returned an invalid content type")
except aiohttp.TooManyRedirects:
raise CrawlWarning("Got stuck in a redirect loop")
except aiohttp.ClientPayloadError as e: # encoding or not enough bytes
last_exception = e
except aiohttp.ClientConnectionError as e: # e.g. timeout, disconnect, resolve failed, etc.
last_exception = e
except asyncio.exceptions.TimeoutError as e: # explicit http timeouts in HttpCrawler
last_exception = e
log.explain_topic(f"Retrying operation {name}. Retries left: {attempts - 1 - round}")
if last_exception:
message = f"Error in I/O Operation: {last_exception}"
if failure_is_error:
raise CrawlError(message) from last_exception
else:
raise CrawlWarning(message) from last_exception
raise CrawlError("Impossible return in ilias _iorepeat")
return wrapper # type: ignore
return decorator
def _wrap_io_in_warning(name: str) -> Callable[[AWrapped], AWrapped]:
"""
Wraps any I/O exception in a CrawlWarning.
"""
return _iorepeat(1, name)
# Crawler control flow:
#
# crawl_desktop -+
# |
# crawl_course --+
# |
# @_io_repeat | # retries internally (before the bar)
# +- crawl_url <-+
# |
# |
# | @_wrap_io_exception # does not need to retry as children acquire bars
# +> crawl_ilias_element -+
# ^ |
# | @_io_repeat | # retries internally (before the bar)
# +- crawl_ilias_page <---+
# | |
# +> get_page | # Handles and retries authentication
# |
# @_io_repeat | # retries internally (before the bar)
# +- download_link <---+
# | |
# +> resolve_target | # Handles and retries authentication
# |
# @_io_repeat | # retries internally (before the bar)
# +- download_video <---+
# | |
# | @_io_repeat | # retries internally (before the bar)
# +- download_file <---+
# |
# +> stream_from_url # Handles and retries authentication
class KitIliasWebCrawler(HttpCrawler):
def __init__( def __init__(
self, self,
name: str, name: str,
section: KitIliasWebCrawlerSection, section: KitIliasWebCrawlerSection,
config: Config, config: Config,
authenticators: Dict[str, Authenticator] authenticators: Dict[str, Authenticator],
): ):
# Setting a main authenticator for cookie sharing super().__init__(name, section, config, authenticators)
auth = section.auth(authenticators)
super().__init__(name, section, config, shared_auth=auth)
if section.tasks() > 1: self._shibboleth_login = ShibbolethLogin(
log.warn(""" _ILIAS_URL,
Please avoid using too many parallel requests as these are the KIT ILIAS self._auth,
instance's greatest bottleneck.
""".strip())
self._shibboleth_login = KitShibbolethLogin(
auth,
section.tfa_auth(authenticators), section.tfa_auth(authenticators),
) )
self._base_url = _ILIAS_URL
self._target = section.target()
self._link_file_redirect_delay = section.link_redirect_delay()
self._links = section.links()
self._videos = section.videos()
self._forums = section.forums()
self._visited_urls: Dict[str, PurePath] = dict()
async def _run(self) -> None:
if isinstance(self._target, int):
log.explain_topic(f"Inferred crawl target: Course with id {self._target}")
await self._crawl_course(self._target)
elif self._target == "desktop":
log.explain_topic("Inferred crawl target: Personal desktop")
await self._crawl_desktop()
else:
log.explain_topic(f"Inferred crawl target: URL {self._target}")
await self._crawl_url(self._target)
async def _crawl_course(self, course_id: int) -> None:
# Start crawling at the given course
root_url = url_set_query_param(
self._base_url + "/goto.php", "target", f"crs_{course_id}"
)
await self._crawl_url(root_url, expected_id=course_id)
async def _crawl_desktop(self) -> None:
appendix = r"ILIAS\PersonalDesktop\PDMainBarProvider|mm_pd_sel_items"
appendix = appendix.encode("ASCII").hex()
await self._crawl_url(self._base_url + "/gs_content.php?item=" + appendix)
async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None:
maybe_cl = await self.crawl(PurePath("."))
if not maybe_cl:
return
cl = maybe_cl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
description: List[BeautifulSoup] = []
@_iorepeat(3, "crawling url")
async def gather_elements() -> None:
elements.clear()
async with cl:
next_stage_url: Optional[str] = url
current_parent = None
# Duplicated code, but the root page is special - we want to avoid fetching it twice!
while next_stage_url:
soup = await self._get_page(next_stage_url)
if current_parent is None and expected_id is not None:
perma_link_element: Tag = soup.find(id="current_perma_link")
if not perma_link_element or "crs_" not in perma_link_element.get("value"):
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
next_stage_url = next_element.url
else:
next_stage_url = None
elements.extend(page.get_child_elements())
if description_string := page.get_description():
description.append(description_string)
# Fill up our task list with the found elements
await gather_elements()
if description:
await self._download_description(PurePath("."), description[0])
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(PurePath("."), element):
tasks.append(asyncio.create_task(handle))
# And execute them
await self.gather(tasks)
async def _handle_ilias_page(
self,
url: str,
parent: IliasPageElement,
path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(path)
if not maybe_cl:
return None
return self._crawl_ilias_page(url, parent, maybe_cl)
@anoncritical
async def _crawl_ilias_page(
self,
url: str,
parent: IliasPageElement,
cl: CrawlToken,
) -> None:
elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
description: List[BeautifulSoup] = []
@_iorepeat(3, "crawling folder")
async def gather_elements() -> None:
elements.clear()
async with cl:
next_stage_url: Optional[str] = url
current_parent = parent
while next_stage_url:
soup = await self._get_page(next_stage_url)
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
next_stage_url = next_element.url
else:
next_stage_url = None
elements.extend(page.get_child_elements())
if description_string := page.get_description():
description.append(description_string)
# Fill up our task list with the found elements
await gather_elements()
if description:
await self._download_description(cl.path, description[0])
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
for element in elements:
if handle := await self._handle_ilias_element(cl.path, element):
tasks.append(asyncio.create_task(handle))
# And execute them
await self.gather(tasks)
# These decorators only apply *to this method* and *NOT* to the returned
# awaitables!
# This method does not await the handlers but returns them instead.
# This ensures one level is handled at a time and name deduplication
# works correctly.
@anoncritical
async def _handle_ilias_element(
self,
parent_path: PurePath,
element: IliasPageElement,
) -> Optional[Coroutine[Any, Any, None]]:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. "
+ f"First path: {fmt_path(self._visited_urls[element.url])}. "
+ f"Second path: {fmt_path(parent_path)}."
)
self._visited_urls[element.url] = parent_path
element_path = PurePath(parent_path, element.name)
if element.type in _VIDEO_ELEMENTS:
if not self._videos:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](enable with option 'videos')"
)
return None
if element.type == IliasElementType.FILE:
return await self._handle_file(element, element_path)
elif element.type == IliasElementType.FORUM:
if not self._forums:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](enable with option 'forums')"
)
return None
return await self._handle_forum(element, element_path)
elif element.type == IliasElementType.TEST:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](tests contain no relevant data)"
)
return None
elif element.type == IliasElementType.SURVEY:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](surveys contain no relevant data)"
)
return None
elif element.type == IliasElementType.LINK:
return await self._handle_link(element, element_path)
elif element.type == IliasElementType.BOOKING:
return await self._handle_booking(element, element_path)
elif element.type == IliasElementType.VIDEO:
return await self._handle_file(element, element_path)
elif element.type == IliasElementType.VIDEO_PLAYER:
return await self._handle_video(element, element_path)
elif element.type in _DIRECTORY_PAGES:
return await self._handle_ilias_page(element.url, element, element_path)
else:
# This will retry it a few times, failing everytime. It doesn't make any network
# requests, so that's fine.
raise CrawlWarning(f"Unknown element type: {element.type!r}")
async def _handle_link(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
log.explain_topic(f"Decision: Crawl Link {fmt_path(element_path)}")
log.explain(f"Links type is {self._links}")
link_template_maybe = self._links.template()
link_extension = self._links.extension()
if not link_template_maybe or not link_extension:
log.explain("Answer: No")
return None
else:
log.explain("Answer: Yes")
element_path = element_path.with_name(element_path.name + link_extension)
maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl:
return None
return self._download_link(element, link_template_maybe, maybe_dl)
@anoncritical
@_iorepeat(3, "resolving link")
async def _download_link(self, element: IliasPageElement, link_template: str, dl: DownloadToken) -> None:
async with dl as (bar, sink):
export_url = element.url.replace("cmd=calldirectlink", "cmd=exportHTML")
real_url = await self._resolve_link_target(export_url)
self._write_link_content(link_template, real_url, element.name, element.description, sink)
def _write_link_content(
self,
link_template: str,
url: str,
name: str,
description: Optional[str],
sink: FileSink,
) -> None:
content = link_template
content = content.replace("{{link}}", url)
content = content.replace("{{name}}", name)
content = content.replace("{{description}}", str(description))
content = content.replace("{{redirect_delay}}", str(self._link_file_redirect_delay))
sink.file.write(content.encode("utf-8"))
sink.done()
async def _handle_booking(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
log.explain_topic(f"Decision: Crawl Booking Link {fmt_path(element_path)}")
log.explain(f"Links type is {self._links}")
link_template_maybe = self._links.template()
link_extension = self._links.extension()
if not link_template_maybe or not link_extension:
log.explain("Answer: No")
return None
else:
log.explain("Answer: Yes")
element_path = element_path.with_name(element_path.name + link_extension)
maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl:
return None
return self._download_booking(element, link_template_maybe, maybe_dl)
@anoncritical
@_iorepeat(1, "downloading description")
async def _download_description(self, parent_path: PurePath, description: BeautifulSoup) -> None:
path = parent_path / "Description.html"
dl = await self.download(path, redownload=Redownload.ALWAYS)
if not dl:
return
async with dl as (bar, sink):
description = clean(insert_base_markup(description))
sink.file.write(description.prettify().encode("utf-8"))
sink.done()
@anoncritical
@_iorepeat(3, "resolving booking")
async def _download_booking(
self,
element: IliasPageElement,
link_template: str,
dl: DownloadToken,
) -> None:
async with dl as (bar, sink):
self._write_link_content(link_template, element.url, element.name, element.description, sink)
async def _resolve_link_target(self, export_url: str) -> str:
async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()).select_one("a").get("href").strip()
await self._authenticate()
async with self.session.get(export_url, allow_redirects=False) as resp:
# No redirect means we were authenticated
if hdrs.LOCATION not in resp.headers:
return soupify(await resp.read()).select_one("a").get("href").strip()
raise CrawlError("resolve_link_target failed even after authenticating")
async def _handle_video(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
# Copy old mapping as it is likely still relevant
if self.prev_report:
self.report.add_custom_value(
str(element_path),
self.prev_report.get_custom_value(str(element_path))
)
# A video might contain other videos, so let's "crawl" the video first
# to ensure rate limits apply. This must be a download as *this token*
# is re-used if the video consists of a single stream. In that case the
# file name is used and *not* the stream name the ilias html parser reported
# to ensure backwards compatibility.
maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.ALWAYS)
# If we do not want to crawl it (user filter) or we have every file
# from the cached mapping already, we can ignore this and bail
if not maybe_dl or self._all_videos_locally_present(element_path):
# Mark all existing cideos as known so they do not get deleted
# during dleanup. We "downloaded" them, just without actually making
# a network request as we assumed they did not change.
for video in self._previous_contained_videos(element_path):
await self.download(video)
return None
return self._download_video(element_path, element, maybe_dl)
def _previous_contained_videos(self, video_path: PurePath) -> List[PurePath]:
if not self.prev_report:
return []
custom_value = self.prev_report.get_custom_value(str(video_path))
if not custom_value:
return []
names = cast(List[str], custom_value)
folder = video_path.parent
return [PurePath(folder, name) for name in names]
def _all_videos_locally_present(self, video_path: PurePath) -> bool:
if contained_videos := self._previous_contained_videos(video_path):
log.explain_topic(f"Checking local cache for video {video_path.name}")
all_found_locally = True
for video in contained_videos:
transformed_path = self._to_local_video_path(video)
if transformed_path:
exists_locally = self._output_dir.resolve(transformed_path).exists()
all_found_locally = all_found_locally and exists_locally
if all_found_locally:
log.explain("Found all videos locally, skipping enumeration request")
return True
log.explain("Missing at least one video, continuing with requests!")
return False
def _to_local_video_path(self, path: PurePath) -> Optional[PurePath]:
if transformed := self._transformer.transform(path):
return self._deduplicator.fixup_path(transformed)
return None
@anoncritical
@_iorepeat(3, "downloading video")
async def _download_video(
self,
original_path: PurePath,
element: IliasPageElement,
dl: DownloadToken
) -> None:
stream_elements: List[IliasPageElement] = []
async with dl as (bar, sink):
page = IliasPage(await self._get_page(element.url), element.url, element)
stream_elements = page.get_child_elements()
if len(stream_elements) > 1:
log.explain(f"Found multiple video streams for {element.name}")
else:
log.explain(f"Using single video mode for {element.name}")
stream_element = stream_elements[0]
transformed_path = self._to_local_video_path(original_path)
if not transformed_path:
raise CrawlError(f"Download returned a path but transform did not for {original_path}")
# We do not have a local cache yet
if self._output_dir.resolve(transformed_path).exists():
log.explain(f"Video for {element.name} existed locally")
else:
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
self.report.add_custom_value(str(original_path), [original_path.name])
return
contained_video_paths: List[str] = []
for stream_element in stream_elements:
video_path = original_path.parent / stream_element.name
contained_video_paths.append(str(video_path))
maybe_dl = await self.download(video_path, mtime=element.mtime, redownload=Redownload.NEVER)
if not maybe_dl:
continue
async with maybe_dl as (bar, sink):
log.explain(f"Streaming video from real url {stream_element.url}")
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
self.report.add_custom_value(str(original_path), contained_video_paths)
async def _handle_file(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl:
return None
return self._download_file(element, maybe_dl)
@anoncritical
@_iorepeat(3, "downloading file")
async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None:
assert dl # The function is only reached when dl is not None
async with dl as (bar, sink):
await self._stream_from_url(element.url, sink, bar, is_video=False)
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None:
async def try_stream() -> bool:
async with self.session.get(url, allow_redirects=is_video) as resp:
if not is_video:
# Redirect means we weren't authenticated
if hdrs.LOCATION in resp.headers:
return False
# we wanted a video but got HTML
if is_video and "html" in resp.content_type:
return False
if resp.content_length:
bar.set_total(resp.content_length)
async for data in resp.content.iter_chunked(1024):
sink.file.write(data)
bar.advance(len(data))
sink.done()
return True
auth_id = await self._current_auth_id()
if await try_stream():
return
await self.authenticate(auth_id)
if not await try_stream():
raise CrawlError("File streaming failed after authenticate()")
async def _handle_forum(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(element_path)
if not maybe_cl:
return None
return self._crawl_forum(element, maybe_cl)
@_iorepeat(3, "crawling forum")
@anoncritical
async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None:
elements: List[IliasForumThread] = []
async with cl:
next_stage_url = element.url
while next_stage_url:
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
soup = await self._get_page(next_stage_url)
page = IliasPage(soup, next_stage_url, None)
if next := page.get_next_stage_element():
next_stage_url = next.url
else:
break
download_data = page.get_download_forum_data()
if not download_data:
raise CrawlWarning("Failed to extract forum data")
if download_data.empty:
log.explain("Forum had no threads")
elements = []
return
html = await self._post_authenticated(download_data.url, download_data.form_data)
elements = parse_ilias_forum_export(soupify(html))
elements.sort(key=lambda elem: elem.title)
tasks: List[Awaitable[None]] = []
for elem in elements:
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, elem)))
# And execute them
await self.gather(tasks)
@anoncritical
@_iorepeat(3, "saving forum thread")
async def _download_forum_thread(
self,
parent_path: PurePath,
element: IliasForumThread,
) -> None:
path = parent_path / (_sanitize_path_name(element.title) + ".html")
maybe_dl = await self.download(path, mtime=element.mtime)
if not maybe_dl:
return
async with maybe_dl as (bar, sink):
content = element.title_tag.prettify()
content += element.content_tag.prettify()
sink.file.write(content.encode("utf-8"))
sink.done()
async def _get_page(self, url: str) -> BeautifulSoup:
auth_id = await self._current_auth_id()
async with self.session.get(url) as request:
soup = soupify(await request.read())
if self._is_logged_in(soup):
return soup
# We weren't authenticated, so try to do that
await self.authenticate(auth_id)
# Retry once after authenticating. If this fails, we will die.
async with self.session.get(url) as request:
soup = soupify(await request.read())
if self._is_logged_in(soup):
return soup
raise CrawlError("get_page failed even after authenticating")
async def _post_authenticated(
self,
url: str,
data: dict[str, Union[str, List[str]]]
) -> BeautifulSoup:
auth_id = await self._current_auth_id()
form_data = aiohttp.FormData()
for key, val in data.items():
form_data.add_field(key, val)
async with self.session.post(url, data=form_data(), allow_redirects=False) as request:
if request.status == 200:
return await request.read()
# We weren't authenticated, so try to do that
await self.authenticate(auth_id)
# Retry once after authenticating. If this fails, we will die.
async with self.session.post(url, data=data, allow_redirects=False) as request:
if request.status == 200:
return await request.read()
raise CrawlError("post_authenticated failed even after authenticating")
# We repeat this as the login method in shibboleth doesn't handle I/O errors.
# Shibboleth is quite reliable as well, the repeat is likely not critical here.
@ _iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session)
@ staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False
class KitShibbolethLogin:
"""
Login via KIT's shibboleth system.
"""
def __init__(self, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]) -> None:
self._auth = authenticator
self._tfa_auth = tfa_authenticator
async def login(self, sess: aiohttp.ClientSession) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
url = f"{_ILIAS_URL}/shib_login.php"
data = {
"sendLogin": "1",
"idp_selection": "https://idp.scc.kit.edu/idp/shibboleth",
"il_target": "",
"home_organization_selection": "Weiter",
}
soup: Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful] = await _shib_post(sess, url, data)
if isinstance(soup, KitShibbolethBackgroundLoginSuccessful):
return
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"class": "full content", "method": "post"})
action = form["action"]
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = "https://idp.scc.kit.edu" + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
"csrf_token": csrf_token
}
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup)
if not self._login_successful(soup):
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = f"{_ILIAS_URL}/Shibboleth.sso/SAML2/POST"
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
await sess.post(url, data=data)
async def _authenticate_tfa(
self,
session: aiohttp.ClientSession,
soup: BeautifulSoup
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
tfa_token = await self._tfa_auth.password()
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = "https://idp.scc.kit.edu" + action
data = {
"_eventId_proceed": "",
"j_tokenNumber": tfa_token,
"csrf_token": csrf_token
}
return await _post(session, url, data)
@staticmethod
def _login_successful(soup: BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="j_tokenNumber") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())
async def _shib_post(
session: aiohttp.ClientSession,
url: str,
data: Any
) -> Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful]:
"""
aiohttp unescapes '/' and ':' in URL query parameters which is not RFC compliant and rejected
by Shibboleth. Thanks a lot. So now we unroll the requests manually, parse location headers and
build encoded URL objects ourselves... Who thought mangling location header was a good idea??
"""
log.explain_topic("Shib login POST")
async with session.post(url, data=data, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Got location {location!r}")
if not location:
raise CrawlWarning(f"Login failed (1), no location header present at {url}")
correct_url = yarl.URL(location, encoded=True)
log.explain(f"Corrected location to {correct_url!r}")
if str(correct_url).startswith(_ILIAS_URL):
log.explain("ILIAS recognized our shib token and logged us in in the background, returning")
return KitShibbolethBackgroundLoginSuccessful()
async with session.get(correct_url, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Redirected to {location!r} with status {response.status}")
# If shib still still has a valid session, it will directly respond to the request
if location is None:
log.explain("Shib recognized us, returning its response directly")
return soupify(await response.read())
as_yarl = yarl.URL(response.url)
# Probably not needed anymore, but might catch a few weird situations with a nicer message
if not location or not as_yarl.host:
raise CrawlWarning(f"Login failed (2), no location header present at {correct_url}")
correct_url = yarl.URL.build(
scheme=as_yarl.scheme,
host=as_yarl.host,
path=location,
encoded=True
)
log.explain(f"Corrected location to {correct_url!r}")
async with session.get(correct_url, allow_redirects=False) as response:
return soupify(await response.read())

View File

@ -0,0 +1,128 @@
from typing import Any, Optional
import aiohttp
import yarl
from bs4 import BeautifulSoup
from ...auth import Authenticator, TfaAuthenticator
from ...logging import log
from ...utils import soupify
from ..crawler import CrawlError
class ShibbolethLogin:
"""
Login via shibboleth system.
"""
def __init__(
self, ilias_url: str, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]
) -> None:
self._ilias_url = ilias_url
self._auth = authenticator
self._tfa_auth = tfa_authenticator
async def login(self, sess: aiohttp.ClientSession) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
url = f"{self._ilias_url}/shib_login.php"
async with sess.get(url) as response:
shib_url = response.url
if str(shib_url).startswith(self._ilias_url):
log.explain(
"ILIAS recognized our shib token and logged us in in the background, returning"
)
return
soup: BeautifulSoup = soupify(await response.read())
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"]
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup, shib_url)
if not self._login_successful(soup):
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = form = soup.find("form", {"method": "post"})["action"]
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
await sess.post(url, data=data)
async def _authenticate_tfa(
self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
tfa_token = await self._tfa_auth.password()
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_tokenNumber": tfa_token,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"]
return await _post(session, url, data)
@staticmethod
def _login_successful(soup: BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="j_tokenNumber") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())

View File

@ -1,8 +1,9 @@
import os import os
import re import re
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime
from pathlib import PurePath from pathlib import PurePath
from typing import Awaitable, List, Optional, Pattern, Set, Tuple, Union from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union
from urllib.parse import urljoin from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
@ -31,24 +32,24 @@ class KitIpdCrawlerSection(HttpCrawlerSection):
return re.compile(regex) return re.compile(regex)
@dataclass(unsafe_hash=True) @dataclass
class KitIpdFile: class KitIpdFile:
name: str name: str
url: str url: str
def explain(self) -> None:
log.explain(f"File {self.name!r} (href={self.url!r})")
@dataclass @dataclass
class KitIpdFolder: class KitIpdFolder:
name: str name: str
files: List[KitIpdFile] entries: List[Union[KitIpdFile, "KitIpdFolder"]]
def explain(self) -> None: def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}") log.explain_topic(f"Folder {self.name!r}")
for file in self.files: for entry in self.entries:
log.explain(f"File {file.name!r} (href={file.url!r})") entry.explain()
def __hash__(self) -> int:
return self.name.__hash__()
class KitIpdCrawler(HttpCrawler): class KitIpdCrawler(HttpCrawler):
@ -72,68 +73,83 @@ class KitIpdCrawler(HttpCrawler):
async with maybe_cl: async with maybe_cl:
for item in await self._fetch_items(): for item in await self._fetch_items():
item.explain()
if isinstance(item, KitIpdFolder): if isinstance(item, KitIpdFolder):
tasks.append(self._crawl_folder(item)) tasks.append(self._crawl_folder(PurePath("."), item))
else: else:
# Orphan files are placed in the root folder log.explain_topic(f"Orphan file {item.name!r} (href={item.url!r})")
tasks.append(self._download_file(PurePath("."), item)) log.explain("Attributing it to root folder")
# do this here to at least be sequential and not parallel (rate limiting is hard, as the
# crawl abstraction does not hold for these requests)
etag, mtime = await self._request_resource_version(item.url)
tasks.append(self._download_file(PurePath("."), item, etag, mtime))
await self.gather(tasks) await self.gather(tasks)
async def _crawl_folder(self, folder: KitIpdFolder) -> None: async def _crawl_folder(self, parent: PurePath, folder: KitIpdFolder) -> None:
path = PurePath(folder.name) path = parent / folder.name
if not await self.crawl(path): if not await self.crawl(path):
return return
tasks = [self._download_file(path, file) for file in folder.files] tasks = []
for entry in folder.entries:
if isinstance(entry, KitIpdFolder):
tasks.append(self._crawl_folder(path, entry))
else:
# do this here to at least be sequential and not parallel (rate limiting is hard, as the crawl
# abstraction does not hold for these requests)
etag, mtime = await self._request_resource_version(entry.url)
tasks.append(self._download_file(path, entry, etag, mtime))
await self.gather(tasks) await self.gather(tasks)
async def _download_file(self, parent: PurePath, file: KitIpdFile) -> None: async def _download_file(
self,
parent: PurePath,
file: KitIpdFile,
etag: Optional[str],
mtime: Optional[datetime]
) -> None:
element_path = parent / file.name element_path = parent / file.name
maybe_dl = await self.download(element_path)
prev_etag = self._get_previous_etag_from_report(element_path)
etag_differs = None if prev_etag is None else prev_etag != etag
maybe_dl = await self.download(element_path, etag_differs=etag_differs, mtime=mtime)
if not maybe_dl: if not maybe_dl:
# keep storing the known file's etag
if prev_etag:
self._add_etag_to_report(element_path, prev_etag)
return return
async with maybe_dl as (bar, sink): async with maybe_dl as (bar, sink):
await self._stream_from_url(file.url, sink, bar) await self._stream_from_url(file.url, element_path, sink, bar)
async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]: async def _fetch_items(self) -> Iterable[Union[KitIpdFile, KitIpdFolder]]:
page, url = await self.get_page() page, url = await self.get_page()
elements: List[Tag] = self._find_file_links(page) elements: List[Tag] = self._find_file_links(page)
items: Set[Union[KitIpdFile, KitIpdFolder]] = set()
# do not add unnecessary nesting for a single <h1> heading
drop_h1: bool = len(page.find_all(name="h1")) <= 1
folder_tree: KitIpdFolder = KitIpdFolder(".", [])
for element in elements: for element in elements:
folder_label = self._find_folder_label(element) parent = HttpCrawler.get_folder_structure_from_heading_hierarchy(element, drop_h1)
if folder_label:
folder = self._extract_folder(folder_label, url)
if folder not in items:
items.add(folder)
folder.explain()
else:
file = self._extract_file(element, url) file = self._extract_file(element, url)
items.add(file)
log.explain_topic(f"Orphan file {file.name!r} (href={file.url!r})")
log.explain("Attributing it to root folder")
return items current_folder: KitIpdFolder = folder_tree
for folder_name in parent.parts:
# helps the type checker to verify that current_folder is indeed a folder
def subfolders() -> Generator[KitIpdFolder, Any, None]:
return (entry for entry in current_folder.entries if isinstance(entry, KitIpdFolder))
def _extract_folder(self, folder_tag: Tag, url: str) -> KitIpdFolder: if not any(entry.name == folder_name for entry in subfolders()):
files: List[KitIpdFile] = [] current_folder.entries.append(KitIpdFolder(folder_name, []))
name = folder_tag.getText().strip() current_folder = next(entry for entry in subfolders() if entry.name == folder_name)
container: Tag = folder_tag.findNextSibling(name="table") current_folder.entries.append(file)
for link in self._find_file_links(container):
files.append(self._extract_file(link, url))
return KitIpdFolder(name, files) return folder_tree.entries
@staticmethod
def _find_folder_label(file_link: Tag) -> Optional[Tag]:
enclosing_table: Tag = file_link.findParent(name="table")
if enclosing_table is None:
return None
return enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$"))
def _extract_file(self, link: Tag, url: str) -> KitIpdFile: def _extract_file(self, link: Tag, url: str) -> KitIpdFile:
url = self._abs_url_from_link(url, link) url = self._abs_url_from_link(url, link)
@ -146,7 +162,7 @@ class KitIpdCrawler(HttpCrawler):
def _abs_url_from_link(self, url: str, link_tag: Tag) -> str: def _abs_url_from_link(self, url: str, link_tag: Tag) -> str:
return urljoin(url, link_tag.get("href")) return urljoin(url, link_tag.get("href"))
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar) -> None: async def _stream_from_url(self, url: str, path: PurePath, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp: async with self.session.get(url, allow_redirects=False) as resp:
if resp.status == 403: if resp.status == 403:
raise CrawlError("Received a 403. Are you within the KIT network/VPN?") raise CrawlError("Received a 403. Are you within the KIT network/VPN?")
@ -159,6 +175,8 @@ class KitIpdCrawler(HttpCrawler):
sink.done() sink.done()
self._add_etag_to_report(path, resp.headers.get("ETag"))
async def get_page(self) -> Tuple[BeautifulSoup, str]: async def get_page(self) -> Tuple[BeautifulSoup, str]:
async with self.session.get(self._url) as request: async with self.session.get(self._url) as request:
# The web page for Algorithmen für Routenplanung contains some # The web page for Algorithmen für Routenplanung contains some

View File

@ -14,7 +14,7 @@ def name_variants(path: PurePath) -> Iterator[PurePath]:
class Deduplicator: class Deduplicator:
FORBIDDEN_CHARS = '<>:"/\\|?*' FORBIDDEN_CHARS = '<>:"/\\|?*' + "".join([chr(i) for i in range(0, 32)])
FORBIDDEN_NAMES = { FORBIDDEN_NAMES = {
"CON", "PRN", "AUX", "NUL", "CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",

View File

@ -59,6 +59,7 @@ class Log:
# Whether different parts of the output are enabled or disabled # Whether different parts of the output are enabled or disabled
self.output_explain = False self.output_explain = False
self.output_status = True self.output_status = True
self.output_not_deleted = True
self.output_report = True self.output_report = True
def _update_live(self) -> None: def _update_live(self) -> None:
@ -207,6 +208,17 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
action = escape(f"{action:<{self.STATUS_WIDTH}}") action = escape(f"{action:<{self.STATUS_WIDTH}}")
self.print(f"{style}{action}[/] {escape(text)} {suffix}") self.print(f"{style}{action}[/] {escape(text)} {suffix}")
def not_deleted(self, style: str, action: str, text: str, suffix: str = "") -> None:
"""
Print a message for a local only file that wasn't
deleted while crawling. Allows markup in the "style"
argument which will be applied to the "action" string.
"""
if self.output_status and self.output_not_deleted:
action = escape(f"{action:<{self.STATUS_WIDTH}}")
self.print(f"{style}{action}[/] {escape(text)} {suffix}")
def report(self, text: str) -> None: def report(self, text: str) -> None:
""" """
Print a report after crawling. Allows markup. Print a report after crawling. Allows markup.
@ -215,6 +227,14 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
if self.output_report: if self.output_report:
self.print(text) self.print(text)
def report_not_deleted(self, text: str) -> None:
"""
Print a report for a local only file that wasn't deleted after crawling. Allows markup.
"""
if self.output_report and self.output_not_deleted:
self.print(text)
@contextmanager @contextmanager
def _bar( def _bar(
self, self,

View File

@ -44,6 +44,7 @@ class OnConflict(Enum):
LOCAL_FIRST = "local-first" LOCAL_FIRST = "local-first"
REMOTE_FIRST = "remote-first" REMOTE_FIRST = "remote-first"
NO_DELETE = "no-delete" NO_DELETE = "no-delete"
NO_DELETE_PROMPT_OVERWRITE = "no-delete-prompt-overwrite"
@staticmethod @staticmethod
def from_string(string: str) -> "OnConflict": def from_string(string: str) -> "OnConflict":
@ -51,11 +52,12 @@ class OnConflict(Enum):
return OnConflict(string) return OnConflict(string)
except ValueError: except ValueError:
raise ValueError("must be one of 'prompt', 'local-first'," raise ValueError("must be one of 'prompt', 'local-first',"
" 'remote-first', 'no-delete'") " 'remote-first', 'no-delete', 'no-delete-prompt-overwrite'")
@dataclass @dataclass
class Heuristics: class Heuristics:
etag_differs: Optional[bool]
mtime: Optional[datetime] mtime: Optional[datetime]
@ -232,8 +234,16 @@ class OutputDirectory:
remote_newer = None remote_newer = None
# ETag should be a more reliable indicator than mtime, so we check it first
if heuristics.etag_differs is not None:
remote_newer = heuristics.etag_differs
if remote_newer:
log.explain("Remote file's entity tag differs")
else:
log.explain("Remote file's entity tag is the same")
# Python on Windows crashes when faced with timestamps around the unix epoch # Python on Windows crashes when faced with timestamps around the unix epoch
if heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970): if remote_newer is None and heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970):
mtime = heuristics.mtime mtime = heuristics.mtime
remote_newer = mtime.timestamp() > stat.st_mtime remote_newer = mtime.timestamp() > stat.st_mtime
if remote_newer: if remote_newer:
@ -264,7 +274,7 @@ class OutputDirectory:
on_conflict: OnConflict, on_conflict: OnConflict,
path: PurePath, path: PurePath,
) -> bool: ) -> bool:
if on_conflict == OnConflict.PROMPT: if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
async with log.exclusive_output(): async with log.exclusive_output():
prompt = f"Replace {fmt_path(path)} with remote file?" prompt = f"Replace {fmt_path(path)} with remote file?"
return await prompt_yes_no(prompt, default=False) return await prompt_yes_no(prompt, default=False)
@ -283,7 +293,7 @@ class OutputDirectory:
on_conflict: OnConflict, on_conflict: OnConflict,
path: PurePath, path: PurePath,
) -> bool: ) -> bool:
if on_conflict == OnConflict.PROMPT: if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
async with log.exclusive_output(): async with log.exclusive_output():
prompt = f"Recursively delete {fmt_path(path)} and replace with remote file?" prompt = f"Recursively delete {fmt_path(path)} and replace with remote file?"
return await prompt_yes_no(prompt, default=False) return await prompt_yes_no(prompt, default=False)
@ -303,7 +313,7 @@ class OutputDirectory:
path: PurePath, path: PurePath,
parent: PurePath, parent: PurePath,
) -> bool: ) -> bool:
if on_conflict == OnConflict.PROMPT: if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
async with log.exclusive_output(): async with log.exclusive_output():
prompt = f"Delete {fmt_path(parent)} so remote file {fmt_path(path)} can be downloaded?" prompt = f"Delete {fmt_path(parent)} so remote file {fmt_path(path)} can be downloaded?"
return await prompt_yes_no(prompt, default=False) return await prompt_yes_no(prompt, default=False)
@ -330,7 +340,7 @@ class OutputDirectory:
return False return False
elif on_conflict == OnConflict.REMOTE_FIRST: elif on_conflict == OnConflict.REMOTE_FIRST:
return True return True
elif on_conflict == OnConflict.NO_DELETE: elif on_conflict in {OnConflict.NO_DELETE, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
return False return False
# This should never be reached # This should never be reached
@ -365,6 +375,8 @@ class OutputDirectory:
self, self,
remote_path: PurePath, remote_path: PurePath,
path: PurePath, path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None, mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None, redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None, on_conflict: Optional[OnConflict] = None,
@ -374,7 +386,7 @@ class OutputDirectory:
MarkConflictError. MarkConflictError.
""" """
heuristics = Heuristics(mtime) heuristics = Heuristics(etag_differs, mtime)
redownload = self._redownload if redownload is None else redownload redownload = self._redownload if redownload is None else redownload
on_conflict = self._on_conflict if on_conflict is None else on_conflict on_conflict = self._on_conflict if on_conflict is None else on_conflict
local_path = self.resolve(path) local_path = self.resolve(path)
@ -495,7 +507,7 @@ class OutputDirectory:
except OSError: except OSError:
pass pass
else: else:
log.status("[bold bright_magenta]", "Not deleted", fmt_path(pure)) log.not_deleted("[bold bright_magenta]", "Not deleted", fmt_path(pure))
self._report.not_delete_file(pure) self._report.not_delete_file(pure)
def load_prev_report(self) -> None: def load_prev_report(self) -> None:

View File

@ -1,5 +1,6 @@
from pathlib import Path from pathlib import Path, PurePath
from typing import Dict, List, Optional from typing import Dict, List, Optional
from urllib.parse import quote
from rich.markup import escape from rich.markup import escape
@ -168,19 +169,26 @@ class Pferd:
log.report("") log.report("")
log.report(f"[bold bright_cyan]Report[/] for {escape(name)}") log.report(f"[bold bright_cyan]Report[/] for {escape(name)}")
def fmt_path_link(relative_path: PurePath) -> str:
# We need to URL-encode the path because it might contain spaces or special characters
absolute_path = str(crawler.output_dir.resolve(relative_path).absolute())
absolute_path = absolute_path.replace("\\\\?\\", "")
link = f"file://{quote(absolute_path)}"
return f"[link={link}]{fmt_path(relative_path)}[/link]"
something_changed = False something_changed = False
for path in sorted(crawler.report.added_files): for path in sorted(crawler.report.added_files):
something_changed = True something_changed = True
log.report(f" [bold bright_green]Added[/] {fmt_path(path)}") log.report(f" [bold bright_green]Added[/] {fmt_path_link(path)}")
for path in sorted(crawler.report.changed_files): for path in sorted(crawler.report.changed_files):
something_changed = True something_changed = True
log.report(f" [bold bright_yellow]Changed[/] {fmt_path(path)}") log.report(f" [bold bright_yellow]Changed[/] {fmt_path_link(path)}")
for path in sorted(crawler.report.deleted_files): for path in sorted(crawler.report.deleted_files):
something_changed = True something_changed = True
log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}") log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}")
for path in sorted(crawler.report.not_deleted_files): for path in sorted(crawler.report.not_deleted_files):
something_changed = True something_changed = True
log.report(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}") log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path_link(path)}")
for warning in crawler.report.encountered_warnings: for warning in crawler.report.encountered_warnings:
something_changed = True something_changed = True

View File

@ -110,6 +110,10 @@ class ExactReTf(Transformation):
except ValueError: except ValueError:
pass pass
named_groups: Dict[str, str] = match.groupdict()
for name, capture in named_groups.items():
locals_dir[name] = capture
result = eval(f"f{right!r}", {}, locals_dir) result = eval(f"f{right!r}", {}, locals_dir)
return Transformed(PurePath(result)) return Transformed(PurePath(result))

View File

@ -1,2 +1,2 @@
NAME = "PFERD" NAME = "PFERD"
VERSION = "3.4.3" VERSION = "3.7.0"

View File

@ -56,6 +56,17 @@ Also, you can download most ILIAS pages directly like this:
$ pferd kit-ilias-web <url> <output_directory> $ pferd kit-ilias-web <url> <output_directory>
``` ```
PFERD supports other ILIAS instances as well, using the `ilias-web` crawler (see
the [config section on `ilias-web`](CONFIG.md#the-ilias-web-crawler) for more
detail on the `base-url` and `client-id` parameters):
```
$ pferd ilias-web \
--base-url https://ilias.my-university.example \
--client-id My_University desktop \
<output_directory>
```
However, the CLI only lets you download a single thing at a time, and the However, the CLI only lets you download a single thing at a time, and the
resulting command can grow long quite quickly. Because of this, PFERD can also resulting command can grow long quite quickly. Because of this, PFERD can also
be used with a config file. be used with a config file.

27
flake.lock generated Normal file
View File

@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1708979614,
"narHash": "sha256-FWLWmYojIg6TeqxSnHkKpHu5SGnFP5um1uUjH+wRV6g=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "b7ee09cf5614b02d289cd86fcfa6f24d4e078c2a",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.11",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

41
flake.nix Normal file
View File

@ -0,0 +1,41 @@
{
description = "Tool for downloading course-related files from ILIAS";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11";
};
outputs = { self, nixpkgs }:
let
# Helper function to generate an attrset '{ x86_64-linux = f "x86_64-linux"; ... }'.
forAllSystems = nixpkgs.lib.genAttrs nixpkgs.lib.systems.flakeExposed;
in
{
packages = forAllSystems (system:
let pkgs = import nixpkgs { inherit system; };
in
rec {
default = pkgs.python3Packages.buildPythonApplication rec {
pname = "pferd";
# Performing black magic
# Don't worry, I sacrificed enough goats for the next few years
version = (pkgs.lib.importTOML ./PFERD/version.py).VERSION;
format = "pyproject";
src = ./.;
nativeBuildInputs = with pkgs.python3Packages; [
setuptools
];
propagatedBuildInputs = with pkgs.python3Packages; [
aiohttp
beautifulsoup4
rich
keyring
certifi
];
};
});
};
}

View File

@ -1,11 +0,0 @@
[mypy]
disallow_any_generics = True
disallow_untyped_defs = True
disallow_incomplete_defs = True
no_implicit_optional = True
warn_unused_ignores = True
warn_unreachable = True
show_error_context = True
[mypy-rich.*,bs4,keyring]
ignore_missing_imports = True

View File

@ -1,3 +1,42 @@
[build-system] [build-system]
requires = ["setuptools", "wheel"] requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta" build-backend = "setuptools.build_meta"
[project]
name = "PFERD"
dependencies = [
"aiohttp>=3.8.1",
"beautifulsoup4>=4.10.0",
"rich>=11.0.0",
"keyring>=23.5.0",
"certifi>=2021.10.8"
]
dynamic = ["version"]
requires-python = ">=3.9"
[project.scripts]
pferd = "PFERD.__main__:main"
[tool.setuptools.dynamic]
version = {attr = "PFERD.version.VERSION"}
[tool.flake8]
max-line-length = 110
[tool.isort]
line_length = 110
[tool.autopep8]
max_line_length = 110
in-place = true
recursive = true
[tool.mypy]
disallow_any_generics = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
no_implicit_optional = true
warn_unused_ignores = true
warn_unreachable = true
show_error_context = true
ignore_missing_imports = true

View File

@ -1,8 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import time
import re import re
import time
from subprocess import run from subprocess import run

View File

@ -2,5 +2,5 @@
set -e set -e
mypy PFERD mypy .
flake8 PFERD flake8 PFERD

View File

@ -2,5 +2,5 @@
set -e set -e
autopep8 --recursive --in-place PFERD autopep8 .
isort PFERD isort .

View File

@ -13,5 +13,5 @@ pip install --upgrade setuptools
pip install --editable . pip install --editable .
# Installing tools and type hints # Installing tools and type hints
pip install --upgrade mypy flake8 autopep8 isort pyinstaller pip install --upgrade mypy flake8 flake8-pyproject autopep8 isort pyinstaller
pip install --upgrade types-chardet types-certifi pip install --upgrade types-chardet types-certifi

View File

@ -1,23 +0,0 @@
[metadata]
name = PFERD
version = attr: PFERD.version.VERSION
[options]
packages = find:
python_requires = >=3.9
install_requires =
aiohttp>=3.8.1
beautifulsoup4>=4.10.0
rich>=11.0.0
keyring>=23.5.0
certifi>=2021.10.8
[options.entry_points]
console_scripts =
pferd = PFERD.__main__:main
[flake8]
max_line_length = 110
[isort]
line_length = 110