Compare commits

..

1 Commits

Author SHA1 Message Date
03efa17cf1 Print mtime before updating file metadata 2023-09-23 13:01:58 +02:00
34 changed files with 1460 additions and 2046 deletions

View File

@ -1,10 +0,0 @@
version: 2
updates:
- package-ecosystem: github-actions
directory: /
schedule:
interval: monthly
groups:
gh-actions:
patterns:
- "*"

View File

@ -1,6 +1,6 @@
name: build-and-release name: build-and-release
on: [push, pull_request] on: push
defaults: defaults:
run: run:
@ -13,12 +13,13 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
os: [ubuntu-latest, windows-latest, macos-13, macos-latest] os: [ubuntu-latest, windows-latest, macos-latest]
python: ["3.9"] python: ["3.9"]
steps: steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5 - uses: actions/checkout@v3
- uses: actions/setup-python@v4
with: with:
python-version: ${{ matrix.python }} python-version: ${{ matrix.python }}
@ -33,12 +34,7 @@ jobs:
run: ./scripts/setup --no-pip run: ./scripts/setup --no-pip
- name: Run checks - name: Run checks
run: | run: ./scripts/check
./scripts/check
./scripts/format
- name: Assert no changes
run: git diff --exit-code
- name: Build - name: Build
run: ./scripts/build run: ./scripts/build
@ -49,9 +45,9 @@ jobs:
run: mv dist/pferd* dist/pferd-${{ matrix.os }} run: mv dist/pferd* dist/pferd-${{ matrix.os }}
- name: Upload binary - name: Upload binary
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v3
with: with:
name: pferd-${{ matrix.os }} name: Binaries
path: dist/pferd-${{ matrix.os }} path: dist/pferd-${{ matrix.os }}
release: release:
@ -61,20 +57,18 @@ jobs:
steps: steps:
- name: Download binaries - name: Download binaries
uses: actions/download-artifact@v4 uses: actions/download-artifact@v3
with: with:
pattern: pferd-* name: Binaries
merge-multiple: true
- name: Rename binaries - name: Rename binaries
run: | run: |
mv pferd-ubuntu-latest pferd-linux mv pferd-ubuntu-latest pferd-linux
mv pferd-windows-latest pferd-windows.exe mv pferd-windows-latest pferd-windows.exe
mv pferd-macos-13 pferd-mac-x86_64
mv pferd-macos-latest pferd-mac mv pferd-macos-latest pferd-mac
- name: Create release - name: Create release
uses: softprops/action-gh-release@v2 uses: softprops/action-gh-release@v1
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with: with:
@ -82,4 +76,3 @@ jobs:
pferd-linux pferd-linux
pferd-windows.exe pferd-windows.exe
pferd-mac pferd-mac
pferd-mac-x86_64

View File

@ -22,54 +22,6 @@ ambiguous situations.
## Unreleased ## Unreleased
## 3.7.0 - 2024-11-13
### Added
- Support for MOB videos in page descriptions
- Clickable links in the report to directly open new/modified/not-deleted files
- Support for non KIT shibboleth login
### Changed
- Remove videos from description pages
- Perform ILIAS cycle detection after processing the transform to allow
ignoring duplicated elements
- Parse headings (h1-h3) as folders in kit-ipd crawler
### Fixed
- Personal desktop/dashboard/favorites crawling
- Crawling of nested courses
- Downloading of links with no target URL
- Handle row flex on description pages
- Add `<!DOCTYPE html>` heading to forum threads to fix mime type detection
- Handle groups in cards
## 3.6.0 - 2024-10-23
### Added
- Generic `ilias-web` crawler and `ilias-web` CLI command
- Support for the course overview page. Using this URL as a target might cause
duplication warnings, as subgroups are listed separately.
- Support for named capture groups in regex transforms
- Crawl custom item groups as folders
### Fixed
- Normalization of meeting names in cards
- Sanitization of slashes in exercise container names
## 3.5.2 - 2024-04-14
### Fixed
- Crawling of personal desktop with ILIAS 8
- Crawling of empty personal desktops
## 3.5.1 - 2024-04-09
### Added
- Support for ILIAS 8
### Fixed
- Video name deduplication
## 3.5.0 - 2023-09-13 ## 3.5.0 - 2023-09-13
### Added ### Added

View File

@ -4,11 +4,11 @@ A config file consists of sections. A section begins with a `[section]` header,
which is followed by a list of `key = value` pairs. Comments must be on their which is followed by a list of `key = value` pairs. Comments must be on their
own line and start with `#`. Multiline values must be indented beyond their key. own line and start with `#`. Multiline values must be indented beyond their key.
Boolean values can be `yes` or `no`. For more details and some examples on the Boolean values can be `yes` or `no`. For more details and some examples on the
format, see the [configparser documentation][cp-file] format, see the [configparser documentation][1] ([interpolation][2] is
([interpolation][cp-interp] is disabled). disabled).
[cp-file]: <https://docs.python.org/3/library/configparser.html#supported-ini-file-structure> "Supported INI File Structure" [1]: <https://docs.python.org/3/library/configparser.html#supported-ini-file-structure> "Supported INI File Structure"
[cp-interp]: <https://docs.python.org/3/library/configparser.html#interpolation-of-values> "Interpolation of values" [2]: <https://docs.python.org/3/library/configparser.html#interpolation-of-values> "Interpolation of values"
## The `DEFAULT` section ## The `DEFAULT` section
@ -146,7 +146,7 @@ crawler simulate a slower, network-based crawler.
This crawler crawls a KIT-IPD page by url. The root page can be crawled from This crawler crawls a KIT-IPD page by url. The root page can be crawled from
outside the KIT network so you will be informed about any new/deleted files, outside the KIT network so you will be informed about any new/deleted files,
but downloading files requires you to be within. Adding a short delay between but downloading files requires you to be within. Adding a show delay between
requests is likely a good idea. requests is likely a good idea.
- `target`: URL to a KIT-IPD page - `target`: URL to a KIT-IPD page
@ -154,63 +154,6 @@ requests is likely a good idea.
matches, the given link is downloaded as a file. This is used to extract matches, the given link is downloaded as a file. This is used to extract
files from KIT-IPD pages. (Default: `^.*?[^/]+\.(pdf|zip|c|cpp|java)$`) files from KIT-IPD pages. (Default: `^.*?[^/]+\.(pdf|zip|c|cpp|java)$`)
### The `ilias-web` crawler
This crawler crawls a generic ILIAS instance.
Inspired by [this ILIAS downloader][ilias-dl], the following configurations should work
out of the box for the corresponding universities:
[ilias-dl]: https://github.com/V3lop5/ilias-downloader/blob/main/configs "ilias-downloader configs"
| University | `base_url` | `login_type` | `client_id` |
|---------------|-----------------------------------------|--------------|---------------|
| FH Aachen | https://www.ili.fh-aachen.de | local | elearning |
| Uni Köln | https://www.ilias.uni-koeln.de/ilias | local | uk |
| Uni Konstanz | https://ilias.uni-konstanz.de | local | ILIASKONSTANZ |
| Uni Stuttgart | https://ilias3.uni-stuttgart.de | local | Uni_Stuttgart |
| Uni Tübingen | https://ovidius.uni-tuebingen.de/ilias3 | shibboleth | |
If your university isn't listed, try navigating to your instance's login page.
Assuming no custom login service is used, the URL will look something like this:
```jinja
{{ base_url }}/login.php?client_id={{ client_id }}&cmd=force_login&lang=
```
If the values work, feel free to submit a PR and add them to the table above.
- `base_url`: The URL where the ILIAS instance is located. (Required)
- `login_type`: How you authenticate. (Required)
- `local`: Use `client_id` for authentication.
- `shibboleth`: Use shibboleth for authentication.
- `client_id`: An ID used for authentication if `login_type` is `local`. Is
ignored if `login_type` is `shibboleth`.
- `target`: The ILIAS element to crawl. (Required)
- `desktop`: Crawl your personal desktop / dashboard
- `<course id>`: Crawl the course with the given id
- `<url>`: Crawl a given element by URL (preferably the permanent URL linked
at the bottom of its ILIAS page).
This also supports the "My Courses" overview page to download *all*
courses. Note that this might produce confusing local directory layouts
and duplication warnings if you are a member of an ILIAS group. The
`desktop` target is generally preferable.
- `auth`: Name of auth section to use for login. (Required)
- `tfa_auth`: Name of auth section to use for two-factor authentication. Only
uses the auth section's password. (Default: Anonymous `tfa` authenticator)
- `links`: How to represent external links. (Default: `fancy`)
- `ignore`: Don't download links.
- `plaintext`: A text file containing only the URL.
- `fancy`: A HTML file looking like the ILIAS link element.
- `internet-shortcut`: An internet shortcut file (`.url` file).
- `link_redirect_delay`: Time (in seconds) until `fancy` link files will
redirect to the actual URL. Set to a negative value to disable the automatic
redirect. (Default: `-1`)
- `videos`: Whether to download videos. (Default: `no`)
- `forums`: Whether to download forum threads. (Default: `no`)
- `http_timeout`: The timeout (in seconds) for all HTTP requests. (Default:
`20.0`)
### The `kit-ilias-web` crawler ### The `kit-ilias-web` crawler
This crawler crawls the KIT ILIAS instance. This crawler crawls the KIT ILIAS instance.
@ -289,10 +232,10 @@ is stored in the keyring.
### The `pass` authenticator ### The `pass` authenticator
This authenticator queries the [`pass` password manager][pass] for a username This authenticator queries the [`pass` password manager][3] for a username and
and password. It tries to be mostly compatible with [browserpass][browserpass] password. It tries to be mostly compatible with [browserpass][4] and
and [passff][passff], so see those links for an overview of the format. If PFERD [passff][5], so see those links for an overview of the format. If PFERD fails
fails to load your password, you can use the `--explain` flag to see why. to load your password, you can use the `--explain` flag to see why.
- `passname`: The name of the password to use (Required) - `passname`: The name of the password to use (Required)
- `username_prefixes`: A comma-separated list of username line prefixes - `username_prefixes`: A comma-separated list of username line prefixes
@ -300,9 +243,9 @@ fails to load your password, you can use the `--explain` flag to see why.
- `password_prefixes`: A comma-separated list of password line prefixes - `password_prefixes`: A comma-separated list of password line prefixes
(Default: `password,pass,secret`) (Default: `password,pass,secret`)
[pass]: <https://www.passwordstore.org/> "Pass: The Standard Unix Password Manager" [3]: <https://www.passwordstore.org/> "Pass: The Standard Unix Password Manager"
[browserpass]: <https://github.com/browserpass/browserpass-extension#organizing-password-store> "Organizing password store" [4]: <https://github.com/browserpass/browserpass-extension#organizing-password-store> "Organizing password store"
[passff]: <https://github.com/passff/passff#multi-line-format> "Multi-line format" [5]: <https://github.com/passff/passff#multi-line-format> "Multi-line format"
### The `tfa` authenticator ### The `tfa` authenticator
@ -401,8 +344,7 @@ matches `SOURCE`, the output path is created using `TARGET` as template.
be referred to as `{g<n>}` (e.g. `{g3}`). `{g0}` refers to the original path. be referred to as `{g<n>}` (e.g. `{g3}`). `{g0}` refers to the original path.
If capturing group *n*'s contents are a valid integer, the integer value is If capturing group *n*'s contents are a valid integer, the integer value is
available as `{i<n>}` (e.g. `{i3}`). If capturing group *n*'s contents are a available as `{i<n>}` (e.g. `{i3}`). If capturing group *n*'s contents are a
valid float, the float value is available as `{f<n>}` (e.g. `{f3}`). Named capture valid float, the float value is available as `{f<n>}` (e.g. `{f3}`). If a
groups (e.g. `(?P<name>)`) are available by their name (e.g. `{name}`). If a
capturing group is not present (e.g. when matching the string `cd` with the capturing group is not present (e.g. when matching the string `cd` with the
regex `(ab)?cd`), the corresponding variables are not defined. regex `(ab)?cd`), the corresponding variables are not defined.

View File

@ -1,6 +1,6 @@
Copyright 2019-2024 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, Copyright 2019-2021 Garmelon, I-Al-Istannen, danstooamerican, pavelzw,
TheChristophe, Scriptim, thelukasprobst, Toorero, TheChristophe, Scriptim, thelukasprobst, Toorero,
Mr-Pine, p-fruck, PinieP Mr-Pine
Permission is hereby granted, free of charge, to any person obtaining a copy of Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in this software and associated documentation files (the "Software"), to deal in

View File

@ -8,7 +8,6 @@
# well. # well.
from . import command_local # noqa: F401 imported but unused from . import command_local # noqa: F401 imported but unused
from . import command_ilias_web # noqa: F401 imported but unused
from . import command_kit_ilias_web # noqa: F401 imported but unused from . import command_kit_ilias_web # noqa: F401 imported but unused
from . import command_kit_ipd # noqa: F401 imported but unused from . import command_kit_ipd # noqa: F401 imported but unused
from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused

View File

@ -1,56 +0,0 @@
import argparse
import configparser
from ..logging import log
from .common_ilias_args import configure_common_group_args, load_common
from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler
COMMAND_NAME = "ilias-web"
SUBPARSER = SUBPARSERS.add_parser(
COMMAND_NAME,
parents=[CRAWLER_PARSER],
)
GROUP = SUBPARSER.add_argument_group(
title=f"{COMMAND_NAME} crawler arguments",
description=f"arguments for the '{COMMAND_NAME}' crawler",
)
GROUP.add_argument(
"--base-url",
type=str,
metavar="BASE_URL",
help="The base url of the ilias instance"
)
GROUP.add_argument(
"--client-id",
type=str,
metavar="CLIENT_ID",
help="The client id of the ilias instance"
)
configure_common_group_args(GROUP)
def load(
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
log.explain(f"Creating config for command '{COMMAND_NAME}'")
parser["crawl:ilias"] = {}
section = parser["crawl:ilias"]
load_crawler(args, section)
section["type"] = COMMAND_NAME
if args.ilias_url is not None:
section["base_url"] = args.ilias_url
if args.client_id is not None:
section["client_id"] = args.client_id
load_common(section, args, parser)
SUBPARSER.set_defaults(command=load)

View File

@ -1,37 +1,120 @@
import argparse import argparse
import configparser import configparser
from pathlib import Path
from ..crawl.ilias.file_templates import Links
from ..logging import log from ..logging import log
from .common_ilias_args import configure_common_group_args, load_common from .parser import (CRAWLER_PARSER, SUBPARSERS, BooleanOptionalAction, ParserLoadError, load_crawler,
from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler show_value_error)
COMMAND_NAME = "kit-ilias-web"
SUBPARSER = SUBPARSERS.add_parser( SUBPARSER = SUBPARSERS.add_parser(
COMMAND_NAME, "kit-ilias-web",
parents=[CRAWLER_PARSER], parents=[CRAWLER_PARSER],
) )
GROUP = SUBPARSER.add_argument_group( GROUP = SUBPARSER.add_argument_group(
title=f"{COMMAND_NAME} crawler arguments", title="kit-ilias-web crawler arguments",
description=f"arguments for the '{COMMAND_NAME}' crawler", description="arguments for the 'kit-ilias-web' crawler",
)
GROUP.add_argument(
"target",
type=str,
metavar="TARGET",
help="course id, 'desktop', or ILIAS URL to crawl"
)
GROUP.add_argument(
"output",
type=Path,
metavar="OUTPUT",
help="output directory"
)
GROUP.add_argument(
"--username", "-u",
type=str,
metavar="USERNAME",
help="user name for authentication"
)
GROUP.add_argument(
"--keyring",
action=BooleanOptionalAction,
help="use the system keyring to store and retrieve passwords"
)
GROUP.add_argument(
"--credential-file",
type=Path,
metavar="PATH",
help="read username and password from a credential file"
)
GROUP.add_argument(
"--links",
type=show_value_error(Links.from_string),
metavar="OPTION",
help="how to represent external links"
)
GROUP.add_argument(
"--link-redirect-delay",
type=int,
metavar="SECONDS",
help="time before 'fancy' links redirect to to their target (-1 to disable)"
)
GROUP.add_argument(
"--videos",
action=BooleanOptionalAction,
help="crawl and download videos"
)
GROUP.add_argument(
"--forums",
action=BooleanOptionalAction,
help="crawl and download forum posts"
)
GROUP.add_argument(
"--http-timeout", "-t",
type=float,
metavar="SECONDS",
help="timeout for all HTTP requests"
) )
configure_common_group_args(GROUP)
def load( def load(
args: argparse.Namespace, args: argparse.Namespace,
parser: configparser.ConfigParser, parser: configparser.ConfigParser,
) -> None: ) -> None:
log.explain(f"Creating config for command '{COMMAND_NAME}'") log.explain("Creating config for command 'kit-ilias-web'")
parser["crawl:ilias"] = {} parser["crawl:ilias"] = {}
section = parser["crawl:ilias"] section = parser["crawl:ilias"]
load_crawler(args, section) load_crawler(args, section)
section["type"] = COMMAND_NAME section["type"] = "kit-ilias-web"
load_common(section, args, parser) section["target"] = str(args.target)
section["output_dir"] = str(args.output)
section["auth"] = "auth:ilias"
if args.links is not None:
section["links"] = str(args.links.value)
if args.link_redirect_delay is not None:
section["link_redirect_delay"] = str(args.link_redirect_delay)
if args.videos is not None:
section["videos"] = "yes" if args.videos else "no"
if args.forums is not None:
section["forums"] = "yes" if args.forums else "no"
if args.http_timeout is not None:
section["http_timeout"] = str(args.http_timeout)
parser["auth:ilias"] = {}
auth_section = parser["auth:ilias"]
if args.credential_file is not None:
if args.username is not None:
raise ParserLoadError("--credential-file and --username can't be used together")
if args.keyring:
raise ParserLoadError("--credential-file and --keyring can't be used together")
auth_section["type"] = "credential-file"
auth_section["path"] = str(args.credential_file)
elif args.keyring:
auth_section["type"] = "keyring"
else:
auth_section["type"] = "simple"
if args.username is not None:
auth_section["username"] = args.username
SUBPARSER.set_defaults(command=load) SUBPARSER.set_defaults(command=load)

View File

@ -1,104 +0,0 @@
import argparse
import configparser
from pathlib import Path
from ..crawl.ilias.file_templates import Links
from .parser import BooleanOptionalAction, ParserLoadError, show_value_error
def configure_common_group_args(group: argparse._ArgumentGroup) -> None:
"""These arguments are shared between the KIT and generic Ilias web command."""
group.add_argument(
"target",
type=str,
metavar="TARGET",
help="course id, 'desktop', or ILIAS URL to crawl"
)
group.add_argument(
"output",
type=Path,
metavar="OUTPUT",
help="output directory"
)
group.add_argument(
"--username", "-u",
type=str,
metavar="USERNAME",
help="user name for authentication"
)
group.add_argument(
"--keyring",
action=BooleanOptionalAction,
help="use the system keyring to store and retrieve passwords"
)
group.add_argument(
"--credential-file",
type=Path,
metavar="PATH",
help="read username and password from a credential file"
)
group.add_argument(
"--links",
type=show_value_error(Links.from_string),
metavar="OPTION",
help="how to represent external links"
)
group.add_argument(
"--link-redirect-delay",
type=int,
metavar="SECONDS",
help="time before 'fancy' links redirect to to their target (-1 to disable)"
)
group.add_argument(
"--videos",
action=BooleanOptionalAction,
help="crawl and download videos"
)
group.add_argument(
"--forums",
action=BooleanOptionalAction,
help="crawl and download forum posts"
)
group.add_argument(
"--http-timeout", "-t",
type=float,
metavar="SECONDS",
help="timeout for all HTTP requests"
)
def load_common(
section: configparser.SectionProxy,
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
"""Load common config between generic and KIT ilias web command"""
section["target"] = str(args.target)
section["output_dir"] = str(args.output)
section["auth"] = "auth:ilias"
if args.links is not None:
section["links"] = str(args.links.value)
if args.link_redirect_delay is not None:
section["link_redirect_delay"] = str(args.link_redirect_delay)
if args.videos is not None:
section["videos"] = "yes" if args.videos else "no"
if args.forums is not None:
section["forums"] = "yes" if args.forums else "no"
if args.http_timeout is not None:
section["http_timeout"] = str(args.http_timeout)
parser["auth:ilias"] = {}
auth_section = parser["auth:ilias"]
if args.credential_file is not None:
if args.username is not None:
raise ParserLoadError("--credential-file and --username can't be used together")
if args.keyring:
raise ParserLoadError("--credential-file and --keyring can't be used together")
auth_section["type"] = "credential-file"
auth_section["path"] = str(args.credential_file)
elif args.keyring:
auth_section["type"] = "keyring"
else:
auth_section["type"] = "simple"
if args.username is not None:
auth_section["username"] = args.username

View File

@ -4,7 +4,7 @@ from typing import Callable, Dict
from ..auth import Authenticator from ..auth import Authenticator
from ..config import Config from ..config import Config
from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401 from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401
from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, KitIliasWebCrawlerSection from .ilias import KitIliasWebCrawler, KitIliasWebCrawlerSection
from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection
from .local_crawler import LocalCrawler, LocalCrawlerSection from .local_crawler import LocalCrawler, LocalCrawlerSection
@ -18,8 +18,6 @@ CrawlerConstructor = Callable[[
CRAWLERS: Dict[str, CrawlerConstructor] = { CRAWLERS: Dict[str, CrawlerConstructor] = {
"local": lambda n, s, c, a: "local": lambda n, s, c, a:
LocalCrawler(n, LocalCrawlerSection(s), c), LocalCrawler(n, LocalCrawlerSection(s), c),
"ilias-web": lambda n, s, c, a:
IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a),
"kit-ilias-web": lambda n, s, c, a: "kit-ilias-web": lambda n, s, c, a:
KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a),
"kit-ipd": lambda n, s, c, a: "kit-ipd": lambda n, s, c, a:

View File

@ -258,10 +258,6 @@ class Crawler(ABC):
def prev_report(self) -> Optional[Report]: def prev_report(self) -> Optional[Report]:
return self._output_dir.prev_report return self._output_dir.prev_report
@property
def output_dir(self) -> OutputDirectory:
return self._output_dir
@staticmethod @staticmethod
async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]: async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]:
""" """
@ -297,8 +293,6 @@ class Crawler(ABC):
async def download( async def download(
self, self,
path: PurePath, path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None, mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None, redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None, on_conflict: Optional[OnConflict] = None,
@ -313,14 +307,7 @@ class Crawler(ABC):
log.status("[bold bright_black]", "Ignored", fmt_path(path)) log.status("[bold bright_black]", "Ignored", fmt_path(path))
return None return None
fs_token = await self._output_dir.download( fs_token = await self._output_dir.download(path, transformed_path, mtime, redownload, on_conflict)
path,
transformed_path,
etag_differs=etag_differs,
mtime=mtime,
redownload=redownload,
on_conflict=on_conflict
)
if fs_token is None: if fs_token is None:
log.explain("Answer: No") log.explain("Answer: No")
return None return None

View File

@ -1,14 +1,12 @@
import asyncio import asyncio
import http.cookies import http.cookies
import ssl import ssl
from datetime import datetime
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional
import aiohttp import aiohttp
import certifi import certifi
from aiohttp.client import ClientTimeout from aiohttp.client import ClientTimeout
from bs4 import Tag
from ..auth import Authenticator from ..auth import Authenticator
from ..config import Config from ..config import Config
@ -17,8 +15,6 @@ from ..utils import fmt_real_path
from ..version import NAME, VERSION from ..version import NAME, VERSION
from .crawler import Crawler, CrawlerSection from .crawler import Crawler, CrawlerSection
ETAGS_CUSTOM_REPORT_VALUE_KEY = "etags"
class HttpCrawlerSection(CrawlerSection): class HttpCrawlerSection(CrawlerSection):
def http_timeout(self) -> float: def http_timeout(self) -> float:
@ -173,78 +169,6 @@ class HttpCrawler(Crawler):
log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}") log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}")
log.warn(str(e)) log.warn(str(e))
@staticmethod
def get_folder_structure_from_heading_hierarchy(file_link: Tag, drop_h1: bool = False) -> PurePath:
"""
Retrieves the hierarchy of headings associated with the give file link and constructs a folder
structure from them.
<h1> level headings usually only appear once and serve as the page title, so they would introduce
redundant nesting. To avoid this, <h1> headings are ignored via the drop_h1 parameter.
"""
def find_associated_headings(tag: Tag, level: int) -> PurePath:
if level == 0 or (level == 1 and drop_h1):
return PurePath()
level_heading = tag.find_previous(name=f"h{level}")
if level_heading is None:
return find_associated_headings(tag, level - 1)
folder_name = level_heading.getText().strip()
return find_associated_headings(level_heading, level - 1) / folder_name
# start at level <h3> because paragraph-level headings are usually too granular for folder names
return find_associated_headings(file_link, 3)
def _get_previous_etag_from_report(self, path: PurePath) -> Optional[str]:
"""
If available, retrieves the entity tag for a given path which was stored in the previous report.
"""
if not self._output_dir.prev_report:
return None
etags = self._output_dir.prev_report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {}
return etags.get(str(path))
def _add_etag_to_report(self, path: PurePath, etag: Optional[str]) -> None:
"""
Adds an entity tag for a given path to the report's custom values.
"""
if not etag:
return
etags = self._output_dir.report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {}
etags[str(path)] = etag
self._output_dir.report.add_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY, etags)
async def _request_resource_version(self, resource_url: str) -> Tuple[Optional[str], Optional[datetime]]:
"""
Requests the ETag and Last-Modified headers of a resource via a HEAD request.
If no entity tag / modification date can be obtained, the according value will be None.
"""
try:
async with self.session.head(resource_url) as resp:
if resp.status != 200:
return None, None
etag_header = resp.headers.get("ETag")
last_modified_header = resp.headers.get("Last-Modified")
if last_modified_header:
try:
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified#directives
datetime_format = "%a, %d %b %Y %H:%M:%S GMT"
last_modified = datetime.strptime(last_modified_header, datetime_format)
except ValueError:
# last_modified remains None
pass
return etag_header, last_modified
except aiohttp.ClientError:
return None, None
async def run(self) -> None: async def run(self) -> None:
self._request_count = 0 self._request_count = 0
self._cookie_jar = aiohttp.CookieJar() self._cookie_jar = aiohttp.CookieJar()
@ -262,12 +186,7 @@ class HttpCrawler(Crawler):
connect=self._http_timeout, connect=self._http_timeout,
sock_connect=self._http_timeout, sock_connect=self._http_timeout,
sock_read=self._http_timeout, sock_read=self._http_timeout,
), )
# See https://github.com/aio-libs/aiohttp/issues/6626
# Without this aiohttp will mangle the redirect header from Shibboleth, invalidating the
# passed signature. Shibboleth will not accept the broken signature and authentication will
# fail.
requote_redirect_url=False
) as session: ) as session:
self.session = session self.session = session
try: try:

View File

@ -1,9 +1,3 @@
from .kit_ilias_web_crawler import (IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, from .kit_ilias_web_crawler import KitIliasWebCrawler, KitIliasWebCrawlerSection
KitIliasWebCrawlerSection)
__all__ = [ __all__ = ["KitIliasWebCrawler", "KitIliasWebCrawlerSection"]
"IliasWebCrawler",
"IliasWebCrawlerSection",
"KitIliasWebCrawler",
"KitIliasWebCrawlerSection",
]

View File

@ -1,40 +0,0 @@
import asyncio
from typing import Any, Callable, Optional
import aiohttp
from ...logging import log
from ..crawler import AWrapped, CrawlError, CrawlWarning
def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]:
def decorator(f: AWrapped) -> AWrapped:
async def wrapper(*args: Any, **kwargs: Any) -> Optional[Any]:
last_exception: Optional[BaseException] = None
for round in range(attempts):
try:
return await f(*args, **kwargs)
except aiohttp.ContentTypeError: # invalid content type
raise CrawlWarning("ILIAS returned an invalid content type")
except aiohttp.TooManyRedirects:
raise CrawlWarning("Got stuck in a redirect loop")
except aiohttp.ClientPayloadError as e: # encoding or not enough bytes
last_exception = e
except aiohttp.ClientConnectionError as e: # e.g. timeout, disconnect, resolve failed, etc.
last_exception = e
except asyncio.exceptions.TimeoutError as e: # explicit http timeouts in HttpCrawler
last_exception = e
log.explain_topic(f"Retrying operation {name}. Retries left: {attempts - 1 - round}")
log.explain(f"Last exception: {last_exception!r}")
if last_exception:
message = f"Error in I/O Operation: {last_exception!r}"
if failure_is_error:
raise CrawlError(message) from last_exception
else:
raise CrawlWarning(message) from last_exception
raise CrawlError("Impossible return in ilias _iorepeat")
return wrapper # type: ignore
return decorator

View File

@ -12,13 +12,6 @@ _STYLE_TAG_CONTENT = """
font-weight: bold; font-weight: bold;
} }
.row-flex {
display: flex;
}
.row-flex-wrap {
flex-wrap: wrap;
}
.accordion-head { .accordion-head {
background-color: #f5f7fa; background-color: #f5f7fa;
padding: 0.5rem 0; padding: 0.5rem 0;
@ -92,11 +85,6 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup:
if isinstance(type(children[0]), Comment): if isinstance(type(children[0]), Comment):
dummy.decompose() dummy.decompose()
# Delete video figures, as they can not be internalized anyway
for video in soup.select(".ilc_media_cont_MediaContainerHighlighted .ilPageVideo"):
if figure := video.find_parent("figure"):
figure.decompose()
for hrule_imposter in soup.find_all(class_="ilc_section_Separator"): for hrule_imposter in soup.find_all(class_="ilc_section_Separator"):
hrule_imposter.insert(0, soup.new_tag("hr")) hrule_imposter.insert(0, soup.new_tag("hr"))

File diff suppressed because it is too large Load Diff

View File

@ -15,27 +15,25 @@ TargetType = Union[str, int]
class IliasElementType(Enum): class IliasElementType(Enum):
BOOKING = "booking"
COURSE = "course"
EXERCISE = "exercise" EXERCISE = "exercise"
EXERCISE_FILES = "exercise_files" # own submitted files EXERCISE_FILES = "exercise_files" # own submitted files
TEST = "test" # an online test. Will be ignored currently.
FILE = "file" FILE = "file"
FOLDER = "folder" FOLDER = "folder"
FORUM = "forum" FORUM = "forum"
LINK = "link"
INFO_TAB = "info_tab" INFO_TAB = "info_tab"
LEARNING_MODULE = "learning_module" LEARNING_MODULE = "learning_module"
LINK = "link" BOOKING = "booking"
MEDIACAST_VIDEO = "mediacast_video"
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder"
MEETING = "meeting" MEETING = "meeting"
MOB_VIDEO = "mob_video" SURVEY = "survey"
SCORM_LEARNING_MODULE = "scorm_learning_module"
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder"
MEDIACAST_VIDEO = "mediacast_video"
OPENCAST_VIDEO = "opencast_video" OPENCAST_VIDEO = "opencast_video"
OPENCAST_VIDEO_PLAYER = "opencast_video_player"
OPENCAST_VIDEO_FOLDER = "opencast_video_folder" OPENCAST_VIDEO_FOLDER = "opencast_video_folder"
OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated" OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated"
OPENCAST_VIDEO_PLAYER = "opencast_video_player"
SCORM_LEARNING_MODULE = "scorm_learning_module"
SURVEY = "survey"
TEST = "test" # an online test. Will be ignored currently.
@dataclass @dataclass
@ -50,10 +48,6 @@ class IliasPageElement:
regexes = [ regexes = [
r"eid=(?P<id>[0-9a-z\-]+)", r"eid=(?P<id>[0-9a-z\-]+)",
r"file_(?P<id>\d+)", r"file_(?P<id>\d+)",
r"copa_(?P<id>\d+)",
r"fold_(?P<id>\d+)",
r"frm_(?P<id>\d+)",
r"exc_(?P<id>\d+)",
r"ref_id=(?P<id>\d+)", r"ref_id=(?P<id>\d+)",
r"target=[a-z]+_(?P<id>\d+)", r"target=[a-z]+_(?P<id>\d+)",
r"mm_(?P<id>\d+)" r"mm_(?P<id>\d+)"
@ -67,52 +61,6 @@ class IliasPageElement:
log.warn(f"Didn't find identity for {self.name} - {self.url}. Please report this.") log.warn(f"Didn't find identity for {self.name} - {self.url}. Please report this.")
return self.url return self.url
@staticmethod
def create_new(
typ: IliasElementType,
url: str,
name: str,
mtime: Optional[datetime] = None,
description: Optional[str] = None,
skip_sanitize: bool = False
) -> 'IliasPageElement':
if typ == IliasElementType.MEETING:
normalized = IliasPageElement._normalize_meeting_name(name)
log.explain(f"Normalized meeting name from {name!r} to {normalized!r}")
name = normalized
if not skip_sanitize:
name = _sanitize_path_name(name)
return IliasPageElement(typ, url, name, mtime, description)
@staticmethod
def _normalize_meeting_name(meeting_name: str) -> str:
"""
Normalizes meeting names, which have a relative time as their first part,
to their date in ISO format.
"""
# This checks whether we can reach a `:` without passing a `-`
if re.search(r"^[^-]+: ", meeting_name):
# Meeting name only contains date: "05. Jan 2000:"
split_delimiter = ":"
else:
# Meeting name contains date and start/end times: "05. Jan 2000, 16:00 - 17:30:"
split_delimiter = ", "
# We have a meeting day without time
date_portion_str = meeting_name.split(split_delimiter)[0]
date_portion = demangle_date(date_portion_str)
# We failed to parse the date, bail out
if not date_portion:
return meeting_name
# Replace the first section with the absolute date
rest_of_name = split_delimiter.join(meeting_name.split(split_delimiter)[1:])
return datetime.strftime(date_portion, "%Y-%m-%d") + split_delimiter + rest_of_name
@dataclass @dataclass
class IliasDownloadForumData: class IliasDownloadForumData:
@ -147,9 +95,13 @@ class IliasPage:
@staticmethod @staticmethod
def is_root_page(soup: BeautifulSoup) -> bool: def is_root_page(soup: BeautifulSoup) -> bool:
if permalink := IliasPage.get_soup_permalink(soup): permalink = soup.find(id="current_perma_link")
return "goto.php?target=root_" in permalink if permalink is None:
return False return False
value = permalink.attrs.get("value")
if value is None:
return False
return "goto.php?target=root_" in value
def get_child_elements(self) -> List[IliasPageElement]: def get_child_elements(self) -> List[IliasPageElement]:
""" """
@ -182,7 +134,7 @@ class IliasPage:
attrs={"href": lambda x: x and "cmdClass=ilinfoscreengui" in x} attrs={"href": lambda x: x and "cmdClass=ilinfoscreengui" in x}
) )
if tab is not None: if tab is not None:
return IliasPageElement.create_new( return IliasPageElement(
IliasElementType.INFO_TAB, IliasElementType.INFO_TAB,
self._abs_url_from_link(tab), self._abs_url_from_link(tab),
"infos" "infos"
@ -324,17 +276,19 @@ class IliasPage:
return False return False
def _is_personal_desktop(self) -> bool: def _is_personal_desktop(self) -> bool:
return "baseclass=ildashboardgui" in self._page_url.lower() and "&cmd=show" in self._page_url.lower() return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x})
def _is_content_page(self) -> bool: def _is_content_page(self) -> bool:
if link := self.get_permalink(): link = self._soup.find(id="current_perma_link")
return "target=copa_" in link if not link:
return False return False
return "target=copa_" in link.get("value")
def _is_learning_module_page(self) -> bool: def _is_learning_module_page(self) -> bool:
if link := self.get_permalink(): link = self._soup.find(id="current_perma_link")
return "target=pg_" in link if not link:
return False return False
return "target=pg_" in link.get("value")
def _contains_collapsed_future_meetings(self) -> bool: def _contains_collapsed_future_meetings(self) -> bool:
return self._uncollapse_future_meetings_url() is not None return self._uncollapse_future_meetings_url() is not None
@ -347,7 +301,7 @@ class IliasPage:
if not element: if not element:
return None return None
link = self._abs_url_from_link(element) link = self._abs_url_from_link(element)
return IliasPageElement.create_new(IliasElementType.FOLDER, link, "show all meetings") return IliasPageElement(IliasElementType.FOLDER, link, "show all meetings")
def _is_content_tab_selected(self) -> bool: def _is_content_tab_selected(self) -> bool:
return self._select_content_page_url() is None return self._select_content_page_url() is None
@ -356,9 +310,6 @@ class IliasPage:
might_be_info = self._soup.find("form", attrs={"name": lambda x: x == "formInfoScreen"}) is not None might_be_info = self._soup.find("form", attrs={"name": lambda x: x == "formInfoScreen"}) is not None
return self._page_type == IliasElementType.INFO_TAB and might_be_info return self._page_type == IliasElementType.INFO_TAB and might_be_info
def _is_course_overview_page(self) -> bool:
return "baseClass=ilmembershipoverviewgui" in self._page_url
def _select_content_page_url(self) -> Optional[IliasPageElement]: def _select_content_page_url(self) -> Optional[IliasPageElement]:
tab = self._soup.find( tab = self._soup.find(
id="tab_view_content", id="tab_view_content",
@ -370,7 +321,7 @@ class IliasPage:
link = tab.find("a") link = tab.find("a")
if link: if link:
link = self._abs_url_from_link(link) link = self._abs_url_from_link(link)
return IliasPageElement.create_new(IliasElementType.FOLDER, link, "select content page") return IliasPageElement(IliasElementType.FOLDER, link, "select content page")
_unexpected_html_warning() _unexpected_html_warning()
log.warn_contd(f"Could not find content tab URL on {self._page_url!r}.") log.warn_contd(f"Could not find content tab URL on {self._page_url!r}.")
@ -400,16 +351,14 @@ class IliasPage:
# and just fetch the lone video url! # and just fetch the lone video url!
if len(streams) == 1: if len(streams) == 1:
video_url = streams[0]["sources"]["mp4"][0]["src"] video_url = streams[0]["sources"]["mp4"][0]["src"]
return [ return [IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, self._source_name)]
IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO, video_url, self._source_name)
]
log.explain(f"Found multiple videos for stream at {self._source_name}") log.explain(f"Found multiple videos for stream at {self._source_name}")
items = [] items = []
for stream in sorted(streams, key=lambda stream: stream["content"]): for stream in sorted(streams, key=lambda stream: stream["content"]):
full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4" full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4"
video_url = stream["sources"]["mp4"][0]["src"] video_url = stream["sources"]["mp4"][0]["src"]
items.append(IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO, video_url, full_name)) items.append(IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, full_name))
return items return items
@ -424,26 +373,17 @@ class IliasPage:
link = self._abs_url_from_link(correct_link) link = self._abs_url_from_link(correct_link)
return IliasPageElement.create_new(IliasElementType.FORUM, link, "show all forum threads") return IliasPageElement(IliasElementType.FORUM, link, "show all forum threads")
def _find_personal_desktop_entries(self) -> List[IliasPageElement]: def _find_personal_desktop_entries(self) -> List[IliasPageElement]:
items: List[IliasPageElement] = [] items: List[IliasPageElement] = []
titles: List[Tag] = self._soup.select("#block_pditems_0 .il-item-title") titles: List[Tag] = self._soup.select(".il-item-title")
for title in titles: for title in titles:
link = title.find("a") link = title.find("a")
if not link:
log.explain(f"Skipping offline item: {title.getText().strip()!r}")
continue
name = _sanitize_path_name(link.text.strip()) name = _sanitize_path_name(link.text.strip())
url = self._abs_url_from_link(link) url = self._abs_url_from_link(link)
if "cmd=manage" in url and "cmdClass=ilPDSelectedItemsBlockGUI" in url:
# Configure button/link does not have anything interesting
continue
type = self._find_type_from_link(name, link, url) type = self._find_type_from_link(name, link, url)
if not type: if not type:
_unexpected_html_warning() _unexpected_html_warning()
@ -456,7 +396,7 @@ class IliasPage:
url = re.sub(r"(target=file_\d+)", r"\1_download", url) url = re.sub(r"(target=file_\d+)", r"\1_download", url)
log.explain("Rewired file URL to include download part") log.explain("Rewired file URL to include download part")
items.append(IliasPageElement.create_new(type, url, name)) items.append(IliasPageElement(type, url, name))
return items return items
@ -474,7 +414,7 @@ class IliasPage:
log.warn_contd(f"Found unknown content page item {name!r} with url {url!r}") log.warn_contd(f"Found unknown content page item {name!r} with url {url!r}")
continue continue
items.append(IliasPageElement.create_new(IliasElementType.FILE, url, name)) items.append(IliasPageElement(IliasElementType.FILE, url, name))
return items return items
@ -487,7 +427,7 @@ class IliasPage:
continue continue
if "cmd=sendfile" not in link["href"]: if "cmd=sendfile" not in link["href"]:
continue continue
items.append(IliasPageElement.create_new( items.append(IliasPageElement(
IliasElementType.FILE, IliasElementType.FILE,
self._abs_url_from_link(link), self._abs_url_from_link(link),
_sanitize_path_name(link.getText()) _sanitize_path_name(link.getText())
@ -515,9 +455,7 @@ class IliasPage:
query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
url = url_set_query_params(url, query_params) url = url_set_query_params(url, query_params)
log.explain("Found ILIAS video frame page, fetching actual content next") log.explain("Found ILIAS video frame page, fetching actual content next")
return [ return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, url, "")]
IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, url, "")
]
is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None
@ -546,7 +484,7 @@ class IliasPage:
url = url_set_query_params(self._page_url, query_params) url = url_set_query_params(self._page_url, query_params)
log.explain("Disabled pagination, retrying folder as a new entry") log.explain("Disabled pagination, retrying folder as a new entry")
return [IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")] return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")]
def _find_opencast_video_entries_no_paging(self) -> List[IliasPageElement]: def _find_opencast_video_entries_no_paging(self) -> List[IliasPageElement]:
""" """
@ -575,9 +513,12 @@ class IliasPage:
modification_string = link.parent.parent.parent.select_one( modification_string = link.parent.parent.parent.select_one(
f"td.std:nth-child({index})" f"td.std:nth-child({index})"
).getText().strip() ).getText().strip()
if match := re.search(r"\d+\.\d+.\d+ \d+:\d+", modification_string): if re.search(r"\d+\.\d+.\d+ - \d+:\d+", modification_string):
modification_time = datetime.strptime(match.group(0), "%d.%m.%Y %H:%M") log.explain(f"Converting {modification_string!r}")
modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M")
break break
else:
log.explain(f"Date has wrong format: {modification_string!r}")
if modification_time is None: if modification_time is None:
log.warn(f"Could not determine upload time for {link}") log.warn(f"Could not determine upload time for {link}")
@ -591,7 +532,7 @@ class IliasPage:
video_url = self._abs_url_from_link(link) video_url = self._abs_url_from_link(link)
log.explain(f"Found video {video_name!r} at {video_url}") log.explain(f"Found video {video_name!r} at {video_url}")
return IliasPageElement.create_new( return IliasPageElement(
IliasElementType.OPENCAST_VIDEO_PLAYER, video_url, video_name, modification_time IliasElementType.OPENCAST_VIDEO_PLAYER, video_url, video_name, modification_time
) )
@ -627,7 +568,7 @@ class IliasPage:
if date is None: if date is None:
log.warn(f"Date parsing failed for exercise entry {name!r}") log.warn(f"Date parsing failed for exercise entry {name!r}")
results.append(IliasPageElement.create_new( results.append(IliasPageElement(
IliasElementType.FILE, IliasElementType.FILE,
self._abs_url_from_link(link), self._abs_url_from_link(link),
name, name,
@ -660,22 +601,22 @@ class IliasPage:
# Two divs, side by side. Left is the name, right is the link ==> get left # Two divs, side by side. Left is the name, right is the link ==> get left
# sibling # sibling
file_name = file_link.parent.findPrevious(name="div").getText().strip() file_name = file_link.parent.findPrevious(name="div").getText().strip()
file_name = _sanitize_path_name(file_name)
url = self._abs_url_from_link(file_link) url = self._abs_url_from_link(file_link)
log.explain(f"Found exercise entry {file_name!r}") log.explain(f"Found exercise entry {file_name!r}")
results.append(IliasPageElement.create_new( results.append(IliasPageElement(
IliasElementType.FILE, IliasElementType.FILE,
url, url,
_sanitize_path_name(container_name) + "/" + _sanitize_path_name(file_name), container_name + "/" + file_name,
mtime=None, # We do not have any timestamp None # We do not have any timestamp
skip_sanitize=True
)) ))
# Find all links to file listings (e.g. "Submitted Files" for groups) # Find all links to file listings (e.g. "Submitted Files" for groups)
file_listings: List[Tag] = container.findAll( file_listings: List[Tag] = container.findAll(
name="a", name="a",
# download links contain the given command class # download links contain the given command class
attrs={"href": lambda x: x and "cmdclass=ilexsubmissionfilegui" in x.lower()} attrs={"href": lambda x: x and "cmdClass=ilexsubmissionfilegui" in x}
) )
# Add each listing as a new # Add each listing as a new
@ -686,15 +627,14 @@ class IliasPage:
label_container: Tag = parent_container.find( label_container: Tag = parent_container.find(
attrs={"class": lambda x: x and "control-label" in x} attrs={"class": lambda x: x and "control-label" in x}
) )
file_name = label_container.getText().strip() file_name = _sanitize_path_name(label_container.getText().strip())
url = self._abs_url_from_link(listing) url = self._abs_url_from_link(listing)
log.explain(f"Found exercise detail {file_name!r} at {url}") log.explain(f"Found exercise detail {file_name!r} at {url}")
results.append(IliasPageElement.create_new( results.append(IliasPageElement(
IliasElementType.EXERCISE_FILES, IliasElementType.EXERCISE_FILES,
url, url,
_sanitize_path_name(container_name) + "/" + _sanitize_path_name(file_name), container_name + "/" + file_name,
None, # we do not have any timestamp None # we do not have any timestamp
skip_sanitize=True
)) ))
return results return results
@ -702,18 +642,12 @@ class IliasPage:
def _find_normal_entries(self) -> List[IliasPageElement]: def _find_normal_entries(self) -> List[IliasPageElement]:
result: List[IliasPageElement] = [] result: List[IliasPageElement] = []
links: List[Tag] = []
# Fetch all links and throw them to the general interpreter # Fetch all links and throw them to the general interpreter
if self._is_course_overview_page(): links: List[Tag] = self._soup.select("a.il_ContainerItemTitle")
log.explain("Page is a course overview page, adjusting link selector")
links.extend(self._soup.select(".il-item-title > a"))
else:
links.extend(self._soup.select("a.il_ContainerItemTitle"))
for link in links: for link in links:
abs_url = self._abs_url_from_link(link) abs_url = self._abs_url_from_link(link)
# Make sure parents are sanitized. We do not want accidental parents parents = self._find_upwards_folder_hierarchy(link)
parents = [_sanitize_path_name(x) for x in self._find_upwards_folder_hierarchy(link)]
if parents: if parents:
element_name = "/".join(parents) + "/" + _sanitize_path_name(link.getText()) element_name = "/".join(parents) + "/" + _sanitize_path_name(link.getText())
@ -731,22 +665,19 @@ class IliasPage:
if not element_type: if not element_type:
continue continue
if element_type == IliasElementType.MEETING:
normalized = _sanitize_path_name(self._normalize_meeting_name(element_name))
log.explain(f"Normalized meeting name from {element_name!r} to {normalized!r}")
element_name = normalized
elif element_type == IliasElementType.FILE: elif element_type == IliasElementType.FILE:
result.append(self._file_to_element(element_name, abs_url, link)) result.append(self._file_to_element(element_name, abs_url, link))
continue continue
log.explain(f"Found {element_name!r}") log.explain(f"Found {element_name!r}")
result.append(IliasPageElement.create_new( result.append(IliasPageElement(element_type, abs_url, element_name, description=description))
element_type,
abs_url,
element_name,
description=description,
skip_sanitize=True
))
result += self._find_cards() result += self._find_cards()
result += self._find_mediacast_videos() result += self._find_mediacast_videos()
result += self._find_mob_videos()
return result return result
@ -766,8 +697,8 @@ class IliasPage:
log.warn_contd(f"No <video> element found for mediacast video '{element_name}'") log.warn_contd(f"No <video> element found for mediacast video '{element_name}'")
continue continue
videos.append(IliasPageElement.create_new( videos.append(IliasPageElement(
typ=IliasElementType.MEDIACAST_VIDEO, type=IliasElementType.MEDIACAST_VIDEO,
url=self._abs_url_from_relative(video_element.get("src")), url=self._abs_url_from_relative(video_element.get("src")),
name=element_name, name=element_name,
mtime=self._find_mediacast_video_mtime(elem.findParent(name="td")) mtime=self._find_mediacast_video_mtime(elem.findParent(name="td"))
@ -775,37 +706,6 @@ class IliasPage:
return videos return videos
def _find_mob_videos(self) -> List[IliasPageElement]:
videos: List[IliasPageElement] = []
for figure in self._soup.select("figure.ilc_media_cont_MediaContainerHighlighted"):
title = figure.select_one("figcaption").getText().strip() + ".mp4"
video_element = figure.select_one("video")
if not video_element:
_unexpected_html_warning()
log.warn_contd(f"No <video> element found for mob video '{title}'")
continue
url = None
for source in video_element.select("source"):
if source.get("type", "") == "video/mp4":
url = source.get("src")
break
if url is None:
_unexpected_html_warning()
log.warn_contd(f"No <source> element found for mob video '{title}'")
continue
videos.append(IliasPageElement.create_new(
typ=IliasElementType.MOB_VIDEO,
url=self._abs_url_from_relative(url),
name=_sanitize_path_name(title),
mtime=None
))
return videos
def _find_mediacast_video_mtime(self, enclosing_td: Tag) -> Optional[datetime]: def _find_mediacast_video_mtime(self, enclosing_td: Tag) -> Optional[datetime]:
description_td: Tag = enclosing_td.findPreviousSibling("td") description_td: Tag = enclosing_td.findPreviousSibling("td")
if not description_td: if not description_td:
@ -856,14 +756,11 @@ class IliasPage:
# ILIAS has proper accordions and weird blocks that look like normal headings, # ILIAS has proper accordions and weird blocks that look like normal headings,
# but some JS later transforms them into an accordion. # but some JS later transforms them into an accordion.
# This is for these weird JS-y blocks and custom item groups # This is for these weird JS-y blocks
if "ilContainerItemsContainer" in parent.get("class"): if "ilContainerItemsContainer" in parent.get("class"):
data_store_url = parent.parent.get("data-store-url", "").lower()
is_custom_item_group = "baseclass=ilcontainerblockpropertiesstoragegui" in data_store_url \
and "cont_block_id=" in data_store_url
# I am currently under the impression that *only* those JS blocks have an # I am currently under the impression that *only* those JS blocks have an
# ilNoDisplay class. # ilNoDisplay class.
if not is_custom_item_group and "ilNoDisplay" not in parent.get("class"): if "ilNoDisplay" not in parent.get("class"):
continue continue
prev: Tag = parent.findPreviousSibling("div") prev: Tag = parent.findPreviousSibling("div")
if "ilContainerBlockHeader" in prev.get("class"): if "ilContainerBlockHeader" in prev.get("class"):
@ -923,9 +820,7 @@ class IliasPage:
full_path = name + "." + file_type full_path = name + "." + file_type
log.explain(f"Found file {full_path!r}") log.explain(f"Found file {full_path!r}")
return IliasPageElement.create_new( return IliasPageElement(IliasElementType.FILE, url, full_path, modification_date)
IliasElementType.FILE, url, full_path, modification_date, skip_sanitize=True
)
def _find_cards(self) -> List[IliasPageElement]: def _find_cards(self) -> List[IliasPageElement]:
result: List[IliasPageElement] = [] result: List[IliasPageElement] = []
@ -942,7 +837,7 @@ class IliasPage:
log.warn_contd(f"Could not extract type for {title}") log.warn_contd(f"Could not extract type for {title}")
continue continue
result.append(IliasPageElement.create_new(type, url, name)) result.append(IliasPageElement(type, url, name))
card_button_tiles: List[Tag] = self._soup.select(".card-title button") card_button_tiles: List[Tag] = self._soup.select(".card-title button")
@ -971,7 +866,7 @@ class IliasPage:
log.warn_contd(f"Could not extract type for {button}") log.warn_contd(f"Could not extract type for {button}")
continue continue
result.append(IliasPageElement.create_new(type, url, name, description=description)) result.append(IliasPageElement(type, url, name, description=description))
return result return result
@ -998,14 +893,10 @@ class IliasPage:
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
if "exc" in icon["class"]: if "exc" in icon["class"]:
return IliasElementType.EXERCISE return IliasElementType.EXERCISE
if "grp" in icon["class"]:
return IliasElementType.FOLDER
if "webr" in icon["class"]: if "webr" in icon["class"]:
return IliasElementType.LINK return IliasElementType.LINK
if "book" in icon["class"]: if "book" in icon["class"]:
return IliasElementType.BOOKING return IliasElementType.BOOKING
if "crsr" in icon["class"]:
return IliasElementType.COURSE
if "frm" in icon["class"]: if "frm" in icon["class"]:
return IliasElementType.FORUM return IliasElementType.FORUM
if "sess" in icon["class"]: if "sess" in icon["class"]:
@ -1069,19 +960,6 @@ class IliasPage:
if "baseClass=ilSAHSPresentationGUI" in parsed_url.query: if "baseClass=ilSAHSPresentationGUI" in parsed_url.query:
return IliasElementType.SCORM_LEARNING_MODULE return IliasElementType.SCORM_LEARNING_MODULE
# other universities might have content type specified in URL path
if "_file_" in parsed_url.path:
return IliasElementType.FILE
if "_fold_" in parsed_url.path or "_copa_" in parsed_url.path:
return IliasElementType.FOLDER
if "_frm_" in parsed_url.path:
return IliasElementType.FORUM
if "_exc_" in parsed_url.path:
return IliasElementType.EXERCISE
# Booking and Meeting can not be detected based on the link. They do have a ref_id though, so # Booking and Meeting can not be detected based on the link. They do have a ref_id though, so
# try to guess it from the image. # try to guess it from the image.
@ -1166,43 +1044,31 @@ class IliasPage:
return IliasElementType.FOLDER return IliasElementType.FOLDER
@staticmethod @staticmethod
def is_logged_in(soup: BeautifulSoup) -> bool: def _normalize_meeting_name(meeting_name: str) -> str:
# Normal ILIAS pages """
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar") Normalizes meeting names, which have a relative time as their first part,
if mainbar is not None: to their date in ISO format.
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x}) """
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop # This checks whether we can reach a `:` without passing a `-`
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}): if re.search(r"^[^-]+: ", meeting_name):
return True # Meeting name only contains date: "05. Jan 2000:"
split_delimiter = ":"
else:
# Meeting name contains date and start/end times: "05. Jan 2000, 16:00 - 17:30:"
split_delimiter = ", "
# Empty personal desktop has zero (0) markers. Match on the text... # We have a meeting day without time
if alert := soup.select_one(".alert-info"): date_portion_str = meeting_name.split(split_delimiter)[0]
text = alert.getText().lower() date_portion = demangle_date(date_portion_str)
if "you have not yet selected any favourites" in text:
return True
if "sie haben aktuell noch keine favoriten ausgewählt" in text:
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by # We failed to parse the date, bail out
# their video listing table if not date_portion:
video_table = soup.find( return meeting_name
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
return True
return False
def get_permalink(self) -> Optional[str]: # Replace the first section with the absolute date
return IliasPage.get_soup_permalink(self._soup) rest_of_name = split_delimiter.join(meeting_name.split(split_delimiter)[1:])
return datetime.strftime(date_portion, "%Y-%m-%d") + split_delimiter + rest_of_name
def _abs_url_from_link(self, link_tag: Tag) -> str: def _abs_url_from_link(self, link_tag: Tag) -> str:
""" """
@ -1216,13 +1082,6 @@ class IliasPage:
""" """
return urljoin(self._page_url, relative_url) return urljoin(self._page_url, relative_url)
@staticmethod
def get_soup_permalink(soup: BeautifulSoup) -> Optional[str]:
perma_link_element: Tag = soup.select_one(".il-footer-permanent-url > a")
if not perma_link_element or not perma_link_element.get("href"):
return None
return perma_link_element.get("href")
def _unexpected_html_warning() -> None: def _unexpected_html_warning() -> None:
log.warn("Encountered unexpected HTML structure, ignoring element.") log.warn("Encountered unexpected HTML structure, ignoring element.")

File diff suppressed because it is too large Load Diff

View File

@ -1,128 +0,0 @@
from typing import Any, Optional
import aiohttp
import yarl
from bs4 import BeautifulSoup
from ...auth import Authenticator, TfaAuthenticator
from ...logging import log
from ...utils import soupify
from ..crawler import CrawlError
class ShibbolethLogin:
"""
Login via shibboleth system.
"""
def __init__(
self, ilias_url: str, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]
) -> None:
self._ilias_url = ilias_url
self._auth = authenticator
self._tfa_auth = tfa_authenticator
async def login(self, sess: aiohttp.ClientSession) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
url = f"{self._ilias_url}/shib_login.php"
async with sess.get(url) as response:
shib_url = response.url
if str(shib_url).startswith(self._ilias_url):
log.explain(
"ILIAS recognized our shib token and logged us in in the background, returning"
)
return
soup: BeautifulSoup = soupify(await response.read())
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"]
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup, shib_url)
if not self._login_successful(soup):
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = form = soup.find("form", {"method": "post"})["action"]
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
await sess.post(url, data=data)
async def _authenticate_tfa(
self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
tfa_token = await self._tfa_auth.password()
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_tokenNumber": tfa_token,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"]
return await _post(session, url, data)
@staticmethod
def _login_successful(soup: BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="j_tokenNumber") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())

View File

@ -1,9 +1,8 @@
import os import os
import re import re
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime
from pathlib import PurePath from pathlib import PurePath
from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union from typing import Awaitable, List, Optional, Pattern, Set, Tuple, Union
from urllib.parse import urljoin from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
@ -32,24 +31,24 @@ class KitIpdCrawlerSection(HttpCrawlerSection):
return re.compile(regex) return re.compile(regex)
@dataclass @dataclass(unsafe_hash=True)
class KitIpdFile: class KitIpdFile:
name: str name: str
url: str url: str
def explain(self) -> None:
log.explain(f"File {self.name!r} (href={self.url!r})")
@dataclass @dataclass
class KitIpdFolder: class KitIpdFolder:
name: str name: str
entries: List[Union[KitIpdFile, "KitIpdFolder"]] files: List[KitIpdFile]
def explain(self) -> None: def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}") log.explain_topic(f"Folder {self.name!r}")
for entry in self.entries: for file in self.files:
entry.explain() log.explain(f"File {file.name!r} (href={file.url!r})")
def __hash__(self) -> int:
return self.name.__hash__()
class KitIpdCrawler(HttpCrawler): class KitIpdCrawler(HttpCrawler):
@ -73,83 +72,68 @@ class KitIpdCrawler(HttpCrawler):
async with maybe_cl: async with maybe_cl:
for item in await self._fetch_items(): for item in await self._fetch_items():
item.explain()
if isinstance(item, KitIpdFolder): if isinstance(item, KitIpdFolder):
tasks.append(self._crawl_folder(PurePath("."), item)) tasks.append(self._crawl_folder(item))
else: else:
log.explain_topic(f"Orphan file {item.name!r} (href={item.url!r})") # Orphan files are placed in the root folder
log.explain("Attributing it to root folder") tasks.append(self._download_file(PurePath("."), item))
# do this here to at least be sequential and not parallel (rate limiting is hard, as the
# crawl abstraction does not hold for these requests)
etag, mtime = await self._request_resource_version(item.url)
tasks.append(self._download_file(PurePath("."), item, etag, mtime))
await self.gather(tasks) await self.gather(tasks)
async def _crawl_folder(self, parent: PurePath, folder: KitIpdFolder) -> None: async def _crawl_folder(self, folder: KitIpdFolder) -> None:
path = parent / folder.name path = PurePath(folder.name)
if not await self.crawl(path): if not await self.crawl(path):
return return
tasks = [] tasks = [self._download_file(path, file) for file in folder.files]
for entry in folder.entries:
if isinstance(entry, KitIpdFolder):
tasks.append(self._crawl_folder(path, entry))
else:
# do this here to at least be sequential and not parallel (rate limiting is hard, as the crawl
# abstraction does not hold for these requests)
etag, mtime = await self._request_resource_version(entry.url)
tasks.append(self._download_file(path, entry, etag, mtime))
await self.gather(tasks) await self.gather(tasks)
async def _download_file( async def _download_file(self, parent: PurePath, file: KitIpdFile) -> None:
self,
parent: PurePath,
file: KitIpdFile,
etag: Optional[str],
mtime: Optional[datetime]
) -> None:
element_path = parent / file.name element_path = parent / file.name
maybe_dl = await self.download(element_path)
prev_etag = self._get_previous_etag_from_report(element_path)
etag_differs = None if prev_etag is None else prev_etag != etag
maybe_dl = await self.download(element_path, etag_differs=etag_differs, mtime=mtime)
if not maybe_dl: if not maybe_dl:
# keep storing the known file's etag
if prev_etag:
self._add_etag_to_report(element_path, prev_etag)
return return
async with maybe_dl as (bar, sink): async with maybe_dl as (bar, sink):
await self._stream_from_url(file.url, element_path, sink, bar) await self._stream_from_url(file.url, sink, bar)
async def _fetch_items(self) -> Iterable[Union[KitIpdFile, KitIpdFolder]]: async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]:
page, url = await self.get_page() page, url = await self.get_page()
elements: List[Tag] = self._find_file_links(page) elements: List[Tag] = self._find_file_links(page)
items: Set[Union[KitIpdFile, KitIpdFolder]] = set()
# do not add unnecessary nesting for a single <h1> heading
drop_h1: bool = len(page.find_all(name="h1")) <= 1
folder_tree: KitIpdFolder = KitIpdFolder(".", [])
for element in elements: for element in elements:
parent = HttpCrawler.get_folder_structure_from_heading_hierarchy(element, drop_h1) folder_label = self._find_folder_label(element)
if folder_label:
folder = self._extract_folder(folder_label, url)
if folder not in items:
items.add(folder)
folder.explain()
else:
file = self._extract_file(element, url) file = self._extract_file(element, url)
items.add(file)
log.explain_topic(f"Orphan file {file.name!r} (href={file.url!r})")
log.explain("Attributing it to root folder")
current_folder: KitIpdFolder = folder_tree return items
for folder_name in parent.parts:
# helps the type checker to verify that current_folder is indeed a folder
def subfolders() -> Generator[KitIpdFolder, Any, None]:
return (entry for entry in current_folder.entries if isinstance(entry, KitIpdFolder))
if not any(entry.name == folder_name for entry in subfolders()): def _extract_folder(self, folder_tag: Tag, url: str) -> KitIpdFolder:
current_folder.entries.append(KitIpdFolder(folder_name, [])) files: List[KitIpdFile] = []
current_folder = next(entry for entry in subfolders() if entry.name == folder_name) name = folder_tag.getText().strip()
current_folder.entries.append(file) container: Tag = folder_tag.findNextSibling(name="table")
for link in self._find_file_links(container):
files.append(self._extract_file(link, url))
return folder_tree.entries return KitIpdFolder(name, files)
@staticmethod
def _find_folder_label(file_link: Tag) -> Optional[Tag]:
enclosing_table: Tag = file_link.findParent(name="table")
if enclosing_table is None:
return None
return enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$"))
def _extract_file(self, link: Tag, url: str) -> KitIpdFile: def _extract_file(self, link: Tag, url: str) -> KitIpdFile:
url = self._abs_url_from_link(url, link) url = self._abs_url_from_link(url, link)
@ -162,7 +146,7 @@ class KitIpdCrawler(HttpCrawler):
def _abs_url_from_link(self, url: str, link_tag: Tag) -> str: def _abs_url_from_link(self, url: str, link_tag: Tag) -> str:
return urljoin(url, link_tag.get("href")) return urljoin(url, link_tag.get("href"))
async def _stream_from_url(self, url: str, path: PurePath, sink: FileSink, bar: ProgressBar) -> None: async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp: async with self.session.get(url, allow_redirects=False) as resp:
if resp.status == 403: if resp.status == 403:
raise CrawlError("Received a 403. Are you within the KIT network/VPN?") raise CrawlError("Received a 403. Are you within the KIT network/VPN?")
@ -175,8 +159,6 @@ class KitIpdCrawler(HttpCrawler):
sink.done() sink.done()
self._add_etag_to_report(path, resp.headers.get("ETag"))
async def get_page(self) -> Tuple[BeautifulSoup, str]: async def get_page(self) -> Tuple[BeautifulSoup, str]:
async with self.session.get(self._url) as request: async with self.session.get(self._url) as request:
# The web page for Algorithmen für Routenplanung contains some # The web page for Algorithmen für Routenplanung contains some

View File

@ -57,7 +57,6 @@ class OnConflict(Enum):
@dataclass @dataclass
class Heuristics: class Heuristics:
etag_differs: Optional[bool]
mtime: Optional[datetime] mtime: Optional[datetime]
@ -234,16 +233,8 @@ class OutputDirectory:
remote_newer = None remote_newer = None
# ETag should be a more reliable indicator than mtime, so we check it first
if heuristics.etag_differs is not None:
remote_newer = heuristics.etag_differs
if remote_newer:
log.explain("Remote file's entity tag differs")
else:
log.explain("Remote file's entity tag is the same")
# Python on Windows crashes when faced with timestamps around the unix epoch # Python on Windows crashes when faced with timestamps around the unix epoch
if remote_newer is None and heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970): if heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970):
mtime = heuristics.mtime mtime = heuristics.mtime
remote_newer = mtime.timestamp() > stat.st_mtime remote_newer = mtime.timestamp() > stat.st_mtime
if remote_newer: if remote_newer:
@ -375,8 +366,6 @@ class OutputDirectory:
self, self,
remote_path: PurePath, remote_path: PurePath,
path: PurePath, path: PurePath,
*,
etag_differs: Optional[bool] = None,
mtime: Optional[datetime] = None, mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None, redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None, on_conflict: Optional[OnConflict] = None,
@ -386,7 +375,7 @@ class OutputDirectory:
MarkConflictError. MarkConflictError.
""" """
heuristics = Heuristics(etag_differs, mtime) heuristics = Heuristics(mtime)
redownload = self._redownload if redownload is None else redownload redownload = self._redownload if redownload is None else redownload
on_conflict = self._on_conflict if on_conflict is None else on_conflict on_conflict = self._on_conflict if on_conflict is None else on_conflict
local_path = self.resolve(path) local_path = self.resolve(path)
@ -426,6 +415,7 @@ class OutputDirectory:
def _update_metadata(self, info: DownloadInfo) -> None: def _update_metadata(self, info: DownloadInfo) -> None:
if mtime := info.heuristics.mtime: if mtime := info.heuristics.mtime:
log.explain(f"Setting mtime to {mtime}")
mtimestamp = mtime.timestamp() mtimestamp = mtime.timestamp()
os.utime(info.local_path, times=(mtimestamp, mtimestamp)) os.utime(info.local_path, times=(mtimestamp, mtimestamp))

View File

@ -1,6 +1,5 @@
from pathlib import Path, PurePath from pathlib import Path
from typing import Dict, List, Optional from typing import Dict, List, Optional
from urllib.parse import quote
from rich.markup import escape from rich.markup import escape
@ -169,26 +168,19 @@ class Pferd:
log.report("") log.report("")
log.report(f"[bold bright_cyan]Report[/] for {escape(name)}") log.report(f"[bold bright_cyan]Report[/] for {escape(name)}")
def fmt_path_link(relative_path: PurePath) -> str:
# We need to URL-encode the path because it might contain spaces or special characters
absolute_path = str(crawler.output_dir.resolve(relative_path).absolute())
absolute_path = absolute_path.replace("\\\\?\\", "")
link = f"file://{quote(absolute_path)}"
return f"[link={link}]{fmt_path(relative_path)}[/link]"
something_changed = False something_changed = False
for path in sorted(crawler.report.added_files): for path in sorted(crawler.report.added_files):
something_changed = True something_changed = True
log.report(f" [bold bright_green]Added[/] {fmt_path_link(path)}") log.report(f" [bold bright_green]Added[/] {fmt_path(path)}")
for path in sorted(crawler.report.changed_files): for path in sorted(crawler.report.changed_files):
something_changed = True something_changed = True
log.report(f" [bold bright_yellow]Changed[/] {fmt_path_link(path)}") log.report(f" [bold bright_yellow]Changed[/] {fmt_path(path)}")
for path in sorted(crawler.report.deleted_files): for path in sorted(crawler.report.deleted_files):
something_changed = True something_changed = True
log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}") log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}")
for path in sorted(crawler.report.not_deleted_files): for path in sorted(crawler.report.not_deleted_files):
something_changed = True something_changed = True
log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path_link(path)}") log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
for warning in crawler.report.encountered_warnings: for warning in crawler.report.encountered_warnings:
something_changed = True something_changed = True

View File

@ -110,10 +110,6 @@ class ExactReTf(Transformation):
except ValueError: except ValueError:
pass pass
named_groups: Dict[str, str] = match.groupdict()
for name, capture in named_groups.items():
locals_dir[name] = capture
result = eval(f"f{right!r}", {}, locals_dir) result = eval(f"f{right!r}", {}, locals_dir)
return Transformed(PurePath(result)) return Transformed(PurePath(result))

View File

@ -1,2 +1,2 @@
NAME = "PFERD" NAME = "PFERD"
VERSION = "3.7.0" VERSION = "3.5.0"

View File

@ -56,17 +56,6 @@ Also, you can download most ILIAS pages directly like this:
$ pferd kit-ilias-web <url> <output_directory> $ pferd kit-ilias-web <url> <output_directory>
``` ```
PFERD supports other ILIAS instances as well, using the `ilias-web` crawler (see
the [config section on `ilias-web`](CONFIG.md#the-ilias-web-crawler) for more
detail on the `base-url` and `client-id` parameters):
```
$ pferd ilias-web \
--base-url https://ilias.my-university.example \
--client-id My_University desktop \
<output_directory>
```
However, the CLI only lets you download a single thing at a time, and the However, the CLI only lets you download a single thing at a time, and the
resulting command can grow long quite quickly. Because of this, PFERD can also resulting command can grow long quite quickly. Because of this, PFERD can also
be used with a config file. be used with a config file.

8
flake.lock generated
View File

@ -2,16 +2,16 @@
"nodes": { "nodes": {
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1708979614, "lastModified": 1694499547,
"narHash": "sha256-FWLWmYojIg6TeqxSnHkKpHu5SGnFP5um1uUjH+wRV6g=", "narHash": "sha256-R7xMz1Iia6JthWRHDn36s/E248WB1/je62ovC/dUVKI=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "b7ee09cf5614b02d289cd86fcfa6f24d4e078c2a", "rev": "e5f018cf150e29aac26c61dac0790ea023c46b24",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "owner": "NixOS",
"ref": "nixos-23.11", "ref": "nixos-23.05",
"repo": "nixpkgs", "repo": "nixpkgs",
"type": "github" "type": "github"
} }

View File

@ -2,7 +2,7 @@
description = "Tool for downloading course-related files from ILIAS"; description = "Tool for downloading course-related files from ILIAS";
inputs = { inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.11"; nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05";
}; };
outputs = { self, nixpkgs }: outputs = { self, nixpkgs }:

11
mypy.ini Normal file
View File

@ -0,0 +1,11 @@
[mypy]
disallow_any_generics = True
disallow_untyped_defs = True
disallow_incomplete_defs = True
no_implicit_optional = True
warn_unused_ignores = True
warn_unreachable = True
show_error_context = True
[mypy-rich.*,bs4,keyring]
ignore_missing_imports = True

View File

@ -1,42 +1,3 @@
[build-system] [build-system]
requires = ["setuptools", "wheel"] requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta" build-backend = "setuptools.build_meta"
[project]
name = "PFERD"
dependencies = [
"aiohttp>=3.8.1",
"beautifulsoup4>=4.10.0",
"rich>=11.0.0",
"keyring>=23.5.0",
"certifi>=2021.10.8"
]
dynamic = ["version"]
requires-python = ">=3.9"
[project.scripts]
pferd = "PFERD.__main__:main"
[tool.setuptools.dynamic]
version = {attr = "PFERD.version.VERSION"}
[tool.flake8]
max-line-length = 110
[tool.isort]
line_length = 110
[tool.autopep8]
max_line_length = 110
in-place = true
recursive = true
[tool.mypy]
disallow_any_generics = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
no_implicit_optional = true
warn_unused_ignores = true
warn_unreachable = true
show_error_context = true
ignore_missing_imports = true

View File

@ -1,8 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import re
import time import time
import re
from subprocess import run from subprocess import run

View File

@ -2,5 +2,5 @@
set -e set -e
mypy . mypy PFERD
flake8 PFERD flake8 PFERD

View File

@ -2,5 +2,5 @@
set -e set -e
autopep8 . autopep8 --recursive --in-place PFERD
isort . isort PFERD

View File

@ -13,5 +13,5 @@ pip install --upgrade setuptools
pip install --editable . pip install --editable .
# Installing tools and type hints # Installing tools and type hints
pip install --upgrade mypy flake8 flake8-pyproject autopep8 isort pyinstaller pip install --upgrade mypy flake8 autopep8 isort pyinstaller
pip install --upgrade types-chardet types-certifi pip install --upgrade types-chardet types-certifi

23
setup.cfg Normal file
View File

@ -0,0 +1,23 @@
[metadata]
name = PFERD
version = attr: PFERD.version.VERSION
[options]
packages = find:
python_requires = >=3.9
install_requires =
aiohttp>=3.8.1
beautifulsoup4>=4.10.0
rich>=11.0.0
keyring>=23.5.0
certifi>=2021.10.8
[options.entry_points]
console_scripts =
pferd = PFERD.__main__:main
[flake8]
max_line_length = 110
[isort]
line_length = 110