Compare commits

..

54 Commits

Author SHA1 Message Date
db86d23989 Merge 77c1f1516c into 533bc27439 2023-09-14 14:11:58 +02:00
533bc27439 Bump version to 3.5.0 2023-09-13 23:13:30 +02:00
0113a0ca10 Update flake.lock 2023-09-13 22:23:36 +02:00
40f8a05ad6 Add .idea to gitignore 2023-09-13 22:23:36 +02:00
50b50513c6 Ignore SCORM learning modules 2023-08-29 13:51:19 +02:00
df3514cd03 Crawl paginated past meetings 2023-08-29 12:41:21 +02:00
ad53185247 Sanitize ascii control characters on windows 2023-08-29 12:41:15 +02:00
87b67e9271 Crawl files in the info tab 2023-08-29 12:41:15 +02:00
b54b3b979c Remove size suffix for content pages 2023-08-27 11:43:05 +02:00
2184ac8040 Add support for ILIAS mediacast listings 2023-08-27 11:43:05 +02:00
b3d412360b Add Nix flake 2023-08-26 23:54:19 +02:00
dbc2553b11 Add default show-not-deleted option
If set to `no`, PFERD won't print status or report messages for not deleted files
2023-08-26 18:43:01 +02:00
68c398f1fe Add support for ILIAS learning modules 2023-08-02 13:34:54 +02:00
123a57beec Fix mypy unreachable error in file_templates 2023-07-29 18:36:33 +02:00
d204dac8ce Detect unexpected root page redirects and abort operation 2023-07-29 18:36:33 +02:00
443f7fe839 Add no-delete-prompt-overwrite crawler conflict resolution option (#75) 2023-07-29 18:36:33 +02:00
0294ceb7d5 Update github action versions 2023-03-22 00:10:54 +01:00
6f30c6583d Fix crawling of cards without descriptions 2023-03-21 23:52:33 +01:00
467fc526e8 Fix crawling of file/video cards 2023-03-21 23:52:24 +01:00
722d2eb393 Fix crawling of courses with preselected timeline tab 2023-03-21 23:36:47 +01:00
6d44aac278 Bump version to 3.4.3 2022-11-29 18:22:19 +01:00
55a2de6b88 Fix crawling English opencast 2022-11-29 18:13:56 +01:00
c0d6d8b229 Use url after redirect for relative links 2022-11-21 18:10:45 +01:00
635caa765d Fix typo
Thanks, burg113
2022-11-15 17:17:57 +01:00
e69b55b349 Add more unofficial package managers (#66) 2022-11-04 12:18:26 +01:00
07200bbde5 Document ilias web crawler's forums option 2022-10-31 14:12:27 +01:00
c020cccc64 Include found paths in "second path found" warning 2022-10-29 14:08:29 +02:00
259cfc20cc Bump version to 3.4.2 2022-10-26 18:26:17 +02:00
37b51a66d8 Update changelog 2022-10-26 18:22:37 +02:00
f47d2f11d8 Append trailing slash to kit-ipd links to ensure urljoin works as expected 2022-10-25 20:28:22 +02:00
1b6be6bd79 Handle content pages in cards 2022-10-24 18:37:26 +02:00
e1430e6298 Handle (and ignore) surveys 2022-10-24 18:37:26 +02:00
5fdd40204b Unwrap future meetings when ILIAS hides them behind a pagination 2022-10-24 14:33:58 +02:00
fb4631ba18 Fix ilias background login 2022-10-24 13:13:36 +02:00
d72fc2760b Handle empty forums 2022-10-24 13:12:17 +02:00
4a51aaa4f5 Fix forum crawling crashing for empty threads 2022-10-19 22:59:33 +02:00
66a5b1ba02 Bump version to 3.4.1 2022-08-17 13:24:01 +02:00
aa5a3a10bc Adjust changelog 2022-08-14 21:48:59 +02:00
d9b111cec2 Correctly nest description entries 2022-08-14 21:45:33 +02:00
345f52a1f6 Detect new login button 2022-08-14 21:41:29 +02:00
ed24366aba Add pass authenticator 2022-06-05 10:04:42 +02:00
46fb782798 Add forum crawling
This downloads all forum posts when needed and saves each thread in its
own html file, named after the thread title.
2022-05-24 23:43:53 +02:00
846c29aee1 Download page descriptions 2022-05-11 21:16:56 +02:00
a5015fe9b1 Correctly parse day-only meeting dates
I failed to recognize the correct format in the previous adjustment, so
this (hopefully) fixes it for good.
Meetings apparently don't always have a time portion.
2022-05-08 23:22:26 +02:00
616b0480f7 Simplify IPD crawler link regex 2022-05-08 18:18:05 +02:00
2f0e04ce13 Adjust changelog 2022-05-05 22:57:55 +02:00
bcc537468c Fix crawling of expanded meetings
The last meeting on every page is expanded by default.
Its content is then shown inline *and* in the meeting page itself.
We should skip the inline content.
2022-05-05 22:53:37 +02:00
694ffb4d77 Fix meeting date parsing
Apparently the new pattern "<relative time qualifier>: <date>," was
added. This patch adds support for it.
2022-05-05 22:28:30 +02:00
af2cc1169a Mention href for users of link_regex option 2022-05-05 14:36:03 +02:00
bc3fa36637 Fix IPD crawler crashing on weird HTML comments 2022-05-05 14:35:42 +02:00
afbd03f777 Fix docs 2022-05-05 14:35:42 +02:00
b8fe25c580 Add .cpp to ipd link regex 2022-05-04 14:19:26 +02:00
77c1f1516c Used proper plural 2021-11-02 12:41:40 +01:00
9e12e96d90 Added alias functionality 2021-11-02 03:42:08 +01:00
24 changed files with 1394 additions and 158 deletions

View File

@ -17,9 +17,9 @@ jobs:
python: ["3.9"]
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- uses: actions/setup-python@v2
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python }}
@ -45,7 +45,7 @@ jobs:
run: mv dist/pferd* dist/pferd-${{ matrix.os }}
- name: Upload binary
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: Binaries
path: dist/pferd-${{ matrix.os }}
@ -57,7 +57,7 @@ jobs:
steps:
- name: Download binaries
uses: actions/download-artifact@v2
uses: actions/download-artifact@v3
with:
name: Binaries

1
.gitignore vendored
View File

@ -3,6 +3,7 @@
/PFERD.egg-info/
__pycache__/
/.vscode/
/.idea/
# pyinstaller
/pferd.spec

View File

@ -22,6 +22,74 @@ ambiguous situations.
## Unreleased
## 3.5.0 - 2023-09-13
### Added
- `no-delete-prompt-override` conflict resolution strategy
- Support for ILIAS learning modules
- `show_not_deleted` option to stop printing the "Not Deleted" status or report
message. This combines nicely with the `no-delete-prompt-override` strategy,
causing PFERD to mostly ignore local-only files.
- Support for mediacast video listings
- Crawling of files in info tab
### Changed
- Remove size suffix for files in content pages
### Fixed
- Crawling of courses with the timeline view as the default tab
- Crawling of file and custom opencast cards
- Crawling of button cards without descriptions
- Abort crawling when encountering an unexpected ilias root page redirect
- Sanitize ascii control characters on Windows
- Crawling of paginated past meetings
- Ignore SCORM learning modules
## 3.4.3 - 2022-11-29
### Added
- Missing documentation for `forums` option
### Changed
- Clear up error message shown when multiple paths are found to an element
### Fixed
- IPD crawler unnecessarily appending trailing slashes
- Crawling opencast when ILIAS is set to English
## 3.4.2 - 2022-10-26
### Added
- Recognize and crawl content pages in cards
- Recognize and ignore surveys
### Fixed
- Forum crawling crashing when a thread has no messages at all
- Forum crawling crashing when a forum has no threads at all
- Ilias login failing in some cases
- Crawling of paginated future meetings
- IPD crawler handling of URLs without trailing slash
## 3.4.1 - 2022-08-17
### Added
- Download of page descriptions
- Forum download support
- `pass` authenticator
### Changed
- Add `cpp` extension to default `link_regex` of IPD crawler
- Mention hrefs in IPD crawler's `--explain` output for users of `link_regex` option
- Simplify default IPD crawler `link_regex`
### Fixed
- IPD crawler crashes on some sites
- Meeting name normalization for yesterday, today and tomorrow
- Crawling of meeting file previews
- Login with new login button html layout
- Descriptions for courses are now placed in the correct subfolder when
downloading the whole desktop
## 3.4.0 - 2022-05-01
### Added

View File

@ -26,6 +26,9 @@ default values for the other sections.
`Added ...`) while running a crawler. (Default: `yes`)
- `report`: Whether PFERD should print a report of added, changed and deleted
local files for all crawlers before exiting. (Default: `yes`)
- `show_not_deleted`: Whether PFERD should print messages in status and report
when a local-only file wasn't deleted. Combines nicely with the
`no-delete-prompt-override` conflict resolution strategy.
- `share_cookies`: Whether crawlers should share cookies where applicable. For
example, some crawlers share cookies if they crawl the same website using the
same account. (Default: `yes`)
@ -75,6 +78,9 @@ common to all crawlers:
using `prompt` and always choosing "yes".
- `no-delete`: Never delete local files, but overwrite local files if the
remote file is different.
- `no-delete-prompt-overwrite`: Never delete local files, but prompt to
overwrite local files if the remote file is different. Combines nicely
with the `show_not_deleted` option.
- `transform`: Rules for renaming and excluding certain files and directories.
For more details, see [this section](#transformation-rules). (Default: empty)
- `tasks`: The maximum number of concurrent tasks (such as crawling or
@ -86,6 +92,9 @@ common to all crawlers:
load for the crawl target. (Default: `0.0`)
- `windows_paths`: Whether PFERD should find alternative names for paths that
are invalid on Windows. (Default: `yes` on Windows, `no` otherwise)
- `aliases`: List of strings that are considered as an alias when invoking with
the `--crawler` or `-C` flag. If there is more than one crawl section with
the same aliases all are selected. Thereby, you can group different crawlers.
Some crawlers may also require credentials for authentication. To configure how
the crawler obtains its credentials, the `auth` option is used. It is set to the
@ -100,6 +109,7 @@ username = foo
password = bar
[crawl:something]
aliases = [sth, some]
type = some-complex-crawler
auth = auth:example
on_conflict = no-delete
@ -146,7 +156,7 @@ requests is likely a good idea.
- `target`: URL to a KIT-IPD page
- `link_regex`: A regex that is matched against the `href` part of links. If it
matches, the given link is downloaded as a file. This is used to extract
files from KIT-IPD pages. (Default: `^.*/[^/]*\.(?:pdf|zip|c|java)$`)
files from KIT-IPD pages. (Default: `^.*?[^/]+\.(pdf|zip|c|cpp|java)$`)
### The `kit-ilias-web` crawler
@ -181,6 +191,7 @@ script once per day should be fine.
redirect to the actual URL. Set to a negative value to disable the automatic
redirect. (Default: `-1`)
- `videos`: Whether to download videos. (Default: `no`)
- `forums`: Whether to download forum threads. (Default: `no`)
- `http_timeout`: The timeout (in seconds) for all HTTP requests. (Default:
`20.0`)
@ -223,6 +234,23 @@ is stored in the keyring.
- `keyring_name`: The service name PFERD uses for storing credentials. (Default:
`PFERD`)
### The `pass` authenticator
This authenticator queries the [`pass` password manager][3] for a username and
password. It tries to be mostly compatible with [browserpass][4] and
[passff][5], so see those links for an overview of the format. If PFERD fails
to load your password, you can use the `--explain` flag to see why.
- `passname`: The name of the password to use (Required)
- `username_prefixes`: A comma-separated list of username line prefixes
(Default: `login,username,user`)
- `password_prefixes`: A comma-separated list of password line prefixes
(Default: `password,pass,secret`)
[3]: <https://www.passwordstore.org/> "Pass: The Standard Unix Password Manager"
[4]: <https://github.com/browserpass/browserpass-extension#organizing-password-store> "Organizing password store"
[5]: <https://github.com/passff/passff#multi-line-format> "Multi-line format"
### The `tfa` authenticator
This authenticator prompts the user on the console for a two-factor
@ -272,7 +300,7 @@ path matches `SOURCE`, it is renamed to `TARGET`.
Example: `foo/bar --> baz`
- Doesn't match `foo`, `a/foo/bar` or `foo/baz`
- Converts `foo/bar` into `baz`
- Converts `foo/bar/wargl` into `bar/wargl`
- Converts `foo/bar/wargl` into `baz/wargl`
Example: `foo/bar --> !`
- Doesn't match `foo`, `a/foo/bar` or `foo/baz`
@ -316,7 +344,7 @@ is a regular expression and `TARGET` an f-string based template. If a path
matches `SOURCE`, the output path is created using `TARGET` as template.
`SOURCE` is automatically anchored.
`TARGET` uses Python's [format string syntax][3]. The *n*-th capturing group can
`TARGET` uses Python's [format string syntax][6]. The *n*-th capturing group can
be referred to as `{g<n>}` (e.g. `{g3}`). `{g0}` refers to the original path.
If capturing group *n*'s contents are a valid integer, the integer value is
available as `{i<n>}` (e.g. `{i3}`). If capturing group *n*'s contents are a
@ -337,7 +365,7 @@ Example: `f(oo+)/be?ar -re-> B{g1.upper()}H/fear`
- Converts `fooooo/bear` into `BOOOOOH/fear`
- Converts `foo/bar/baz` into `BOOH/fear/baz`
[3]: <https://docs.python.org/3/library/string.html#format-string-syntax> "Format String Syntax"
[6]: <https://docs.python.org/3/library/string.html#format-string-syntax> "Format String Syntax"
### The `-name-re->` arrow

View File

@ -1,5 +1,6 @@
Copyright 2019-2021 Garmelon, I-Al-Istannen, danstooamerican, pavelzw,
TheChristophe, Scriptim, thelukasprobst, Toorero
TheChristophe, Scriptim, thelukasprobst, Toorero,
Mr-Pine
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in

View File

@ -47,6 +47,8 @@ def configure_logging_from_args(args: argparse.Namespace) -> None:
log.output_explain = args.explain
if args.status is not None:
log.output_status = args.status
if args.show_not_deleted is not None:
log.output_not_deleted = args.show_not_deleted
if args.report is not None:
log.output_report = args.report
@ -72,6 +74,8 @@ def configure_logging_from_config(args: argparse.Namespace, config: Config) -> N
log.output_status = config.default_section.status()
if args.report is None:
log.output_report = config.default_section.report()
if args.show_not_deleted is None:
log.output_not_deleted = config.default_section.show_not_deleted()
except ConfigOptionError as e:
log.error(str(e))
sys.exit(1)

View File

@ -5,6 +5,7 @@ from ..config import Config
from .authenticator import Authenticator, AuthError, AuthLoadError, AuthSection # noqa: F401
from .credential_file import CredentialFileAuthenticator, CredentialFileAuthSection
from .keyring import KeyringAuthenticator, KeyringAuthSection
from .pass_ import PassAuthenticator, PassAuthSection
from .simple import SimpleAuthenticator, SimpleAuthSection
from .tfa import TfaAuthenticator
@ -19,6 +20,8 @@ AUTHENTICATORS: Dict[str, AuthConstructor] = {
CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c),
"keyring": lambda n, s, c:
KeyringAuthenticator(n, KeyringAuthSection(s)),
"pass": lambda n, s, c:
PassAuthenticator(n, PassAuthSection(s)),
"simple": lambda n, s, c:
SimpleAuthenticator(n, SimpleAuthSection(s)),
"tfa": lambda n, s, c:

98
PFERD/auth/pass_.py Normal file
View File

@ -0,0 +1,98 @@
import re
import subprocess
from typing import List, Tuple
from ..logging import log
from .authenticator import Authenticator, AuthError, AuthSection
class PassAuthSection(AuthSection):
def passname(self) -> str:
if (value := self.s.get("passname")) is None:
self.missing_value("passname")
return value
def username_prefixes(self) -> List[str]:
value = self.s.get("username_prefixes", "login,username,user")
return [prefix.lower() for prefix in value.split(",")]
def password_prefixes(self) -> List[str]:
value = self.s.get("password_prefixes", "password,pass,secret")
return [prefix.lower() for prefix in value.split(",")]
class PassAuthenticator(Authenticator):
PREFIXED_LINE_RE = r"([a-zA-Z]+):\s?(.*)" # to be used with fullmatch
def __init__(self, name: str, section: PassAuthSection) -> None:
super().__init__(name)
self._passname = section.passname()
self._username_prefixes = section.username_prefixes()
self._password_prefixes = section.password_prefixes()
async def credentials(self) -> Tuple[str, str]:
log.explain_topic("Obtaining credentials from pass")
try:
log.explain(f"Calling 'pass show {self._passname}'")
result = subprocess.check_output(["pass", "show", self._passname], text=True)
except subprocess.CalledProcessError as e:
raise AuthError(f"Failed to get password info from {self._passname}: {e}")
prefixed = {}
unprefixed = []
for line in result.strip().splitlines():
if match := re.fullmatch(self.PREFIXED_LINE_RE, line):
prefix = match.group(1).lower()
value = match.group(2)
log.explain(f"Found prefixed line {line!r} with prefix {prefix!r}, value {value!r}")
if prefix in prefixed:
raise AuthError(f"Prefix {prefix} specified multiple times")
prefixed[prefix] = value
else:
log.explain(f"Found unprefixed line {line!r}")
unprefixed.append(line)
username = None
for prefix in self._username_prefixes:
log.explain(f"Looking for username at prefix {prefix!r}")
if prefix in prefixed:
username = prefixed[prefix]
log.explain(f"Found username {username!r}")
break
password = None
for prefix in self._password_prefixes:
log.explain(f"Looking for password at prefix {prefix!r}")
if prefix in prefixed:
password = prefixed[prefix]
log.explain(f"Found password {password!r}")
break
if password is None and username is None:
log.explain("No username and password found so far")
log.explain("Using first unprefixed line as password")
log.explain("Using second unprefixed line as username")
elif password is None:
log.explain("No password found so far")
log.explain("Using first unprefixed line as password")
elif username is None:
log.explain("No username found so far")
log.explain("Using first unprefixed line as username")
if password is None:
if not unprefixed:
log.explain("Not enough unprefixed lines left")
raise AuthError("Password could not be determined")
password = unprefixed.pop(0)
log.explain(f"Found password {password!r}")
if username is None:
if not unprefixed:
log.explain("Not enough unprefixed lines left")
raise AuthError("Username could not be determined")
username = unprefixed.pop(0)
log.explain(f"Found username {username!r}")
return username, password

View File

@ -62,6 +62,11 @@ GROUP.add_argument(
action=BooleanOptionalAction,
help="crawl and download videos"
)
GROUP.add_argument(
"--forums",
action=BooleanOptionalAction,
help="crawl and download forum posts"
)
GROUP.add_argument(
"--http-timeout", "-t",
type=float,
@ -90,6 +95,8 @@ def load(
section["link_redirect_delay"] = str(args.link_redirect_delay)
if args.videos is not None:
section["videos"] = "yes" if args.videos else "no"
if args.forums is not None:
section["forums"] = "yes" if args.forums else "no"
if args.http_timeout is not None:
section["http_timeout"] = str(args.http_timeout)

View File

@ -215,6 +215,11 @@ PARSER.add_argument(
action=BooleanOptionalAction,
help="whether crawlers should share cookies where applicable"
)
PARSER.add_argument(
"--show-not-deleted",
action=BooleanOptionalAction,
help="print messages in status and report when PFERD did not delete a local only file"
)
def load_default_section(
@ -233,6 +238,8 @@ def load_default_section(
section["report"] = "yes" if args.report else "no"
if args.share_cookies is not None:
section["share_cookies"] = "yes" if args.share_cookies else "no"
if args.show_not_deleted is not None:
section["show_not_deleted"] = "yes" if args.show_not_deleted else "no"
SUBPARSERS = PARSER.add_subparsers(title="crawlers")

View File

@ -82,6 +82,9 @@ class DefaultSection(Section):
def report(self) -> bool:
return self.s.getboolean("report", fallback=True)
def show_not_deleted(self) -> bool:
return self.s.getboolean("show_not_deleted", fallback=True)
def share_cookies(self) -> bool:
return self.s.getboolean("share_cookies", fallback=True)

View File

@ -1,6 +1,10 @@
from enum import Enum
from typing import Optional
import bs4
from PFERD.utils import soupify
_link_template_plain = "{{link}}"
_link_template_fancy = """
<!DOCTYPE html>
@ -94,6 +98,71 @@ _link_template_internet_shortcut = """
URL={{link}}
""".strip()
_learning_module_template = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>{{name}}</title>
</head>
<style>
* {
box-sizing: border-box;
}
.center-flex {
display: flex;
align-items: center;
justify-content: center;
}
.nav {
display: flex;
justify-content: space-between;
}
</style>
<body class="center-flex">
{{body}}
</body>
</html>
"""
def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str:
# Seems to be comments, ignore those.
for elem in body.select(".il-copg-mob-fullscreen-modal"):
elem.decompose()
nav_template = """
<div class="nav">
{{left}}
{{right}}
</div>
"""
if prev and body.select_one(".ilc_page_lnav_LeftNavigation"):
text = body.select_one(".ilc_page_lnav_LeftNavigation").getText().strip()
left = f'<a href="{prev}">{text}</a>'
else:
left = "<span></span>"
if next and body.select_one(".ilc_page_rnav_RightNavigation"):
text = body.select_one(".ilc_page_rnav_RightNavigation").getText().strip()
right = f'<a href="{next}">{text}</a>'
else:
right = "<span></span>"
if top_nav := body.select_one(".ilc_page_tnav_TopNavigation"):
top_nav.replace_with(
soupify(nav_template.replace("{{left}}", left).replace("{{right}}", right).encode())
)
if bot_nav := body.select_one(".ilc_page_bnav_BottomNavigation"):
bot_nav.replace_with(soupify(nav_template.replace(
"{{left}}", left).replace("{{right}}", right).encode())
)
body = body.prettify()
return _learning_module_template.replace("{{body}}", body).replace("{{name}}", name)
class Links(Enum):
IGNORE = "ignore"
@ -102,24 +171,24 @@ class Links(Enum):
INTERNET_SHORTCUT = "internet-shortcut"
def template(self) -> Optional[str]:
if self == self.FANCY:
if self == Links.FANCY:
return _link_template_fancy
elif self == self.PLAINTEXT:
elif self == Links.PLAINTEXT:
return _link_template_plain
elif self == self.INTERNET_SHORTCUT:
elif self == Links.INTERNET_SHORTCUT:
return _link_template_internet_shortcut
elif self == self.IGNORE:
elif self == Links.IGNORE:
return None
raise ValueError("Missing switch case")
def extension(self) -> Optional[str]:
if self == self.FANCY:
if self == Links.FANCY:
return ".html"
elif self == self.PLAINTEXT:
elif self == Links.PLAINTEXT:
return ".txt"
elif self == self.INTERNET_SHORTCUT:
elif self == Links.INTERNET_SHORTCUT:
return ".url"
elif self == self.IGNORE:
elif self == Links.IGNORE:
return None
raise ValueError("Missing switch case")

View File

@ -0,0 +1,91 @@
from bs4 import BeautifulSoup, Comment, Tag
_STYLE_TAG_CONTENT = """
.ilc_text_block_Information {
background-color: #f5f7fa;
}
div.ilc_text_block_Standard {
margin-bottom: 10px;
margin-top: 10px;
}
span.ilc_text_inline_Strong {
font-weight: bold;
}
.accordion-head {
background-color: #f5f7fa;
padding: 0.5rem 0;
}
h3 {
margin-top: 0.5rem;
margin-bottom: 1rem;
}
br.visible-break {
margin-bottom: 1rem;
}
article {
margin: 0.5rem 0;
}
body {
padding: 1em;
grid-template-columns: 1fr min(60rem, 90%) 1fr;
line-height: 1.2;
}
"""
_ARTICLE_WORTHY_CLASSES = [
"ilc_text_block_Information",
"ilc_section_Attention",
"ilc_section_Link",
]
def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup:
head = soup.new_tag("head")
soup.insert(0, head)
simplecss_link: Tag = soup.new_tag("link")
# <link rel="stylesheet" href="https://cdn.simplecss.org/simple.css">
simplecss_link["rel"] = "stylesheet"
simplecss_link["href"] = "https://cdn.simplecss.org/simple.css"
head.append(simplecss_link)
# Basic style tags for compat
style: Tag = soup.new_tag("style")
style.append(_STYLE_TAG_CONTENT)
head.append(style)
return soup
def clean(soup: BeautifulSoup) -> BeautifulSoup:
for block in soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES):
block.name = "article"
for block in soup.find_all("h3"):
block.name = "div"
for block in soup.find_all("h1"):
block.name = "h3"
for block in soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap"):
block.name = "h3"
block["class"] += ["accordion-head"]
for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"):
children = list(dummy.children)
if not children:
dummy.decompose()
if len(children) > 1:
continue
if isinstance(type(children[0]), Comment):
dummy.decompose()
for hrule_imposter in soup.find_all(class_="ilc_section_Separator"):
hrule_imposter.insert(0, soup.new_tag("hr"))
return soup

View File

@ -3,7 +3,7 @@ import re
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from enum import Enum
from typing import List, Optional, Union
from typing import Dict, List, Optional, Union, cast
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup, Tag
@ -22,12 +22,18 @@ class IliasElementType(Enum):
FOLDER = "folder"
FORUM = "forum"
LINK = "link"
INFO_TAB = "info_tab"
LEARNING_MODULE = "learning_module"
BOOKING = "booking"
MEETING = "meeting"
VIDEO = "video"
VIDEO_PLAYER = "video_player"
VIDEO_FOLDER = "video_folder"
VIDEO_FOLDER_MAYBE_PAGINATED = "video_folder_maybe_paginated"
SURVEY = "survey"
SCORM_LEARNING_MODULE = "scorm_learning_module"
MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder"
MEDIACAST_VIDEO = "mediacast_video"
OPENCAST_VIDEO = "opencast_video"
OPENCAST_VIDEO_PLAYER = "opencast_video_player"
OPENCAST_VIDEO_FOLDER = "opencast_video_folder"
OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated"
@dataclass
@ -43,7 +49,8 @@ class IliasPageElement:
r"eid=(?P<id>[0-9a-z\-]+)",
r"file_(?P<id>\d+)",
r"ref_id=(?P<id>\d+)",
r"target=[a-z]+_(?P<id>\d+)"
r"target=[a-z]+_(?P<id>\d+)",
r"mm_(?P<id>\d+)"
]
for regex in regexes:
@ -55,6 +62,29 @@ class IliasPageElement:
return self.url
@dataclass
class IliasDownloadForumData:
url: str
form_data: Dict[str, Union[str, List[str]]]
empty: bool
@dataclass
class IliasForumThread:
title: str
title_tag: Tag
content_tag: Tag
mtime: Optional[datetime]
@dataclass
class IliasLearningModulePage:
title: str
content: Tag
next_url: Optional[str]
previous_url: Optional[str]
class IliasPage:
def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]):
@ -63,6 +93,16 @@ class IliasPage:
self._page_type = source_element.type if source_element else None
self._source_name = source_element.name if source_element else ""
@staticmethod
def is_root_page(soup: BeautifulSoup) -> bool:
permalink = soup.find(id="current_perma_link")
if permalink is None:
return False
value = permalink.attrs.get("value")
if value is None:
return False
return "goto.php?target=root_" in value
def get_child_elements(self) -> List[IliasPageElement]:
"""
Return all child page elements you can find here.
@ -70,9 +110,9 @@ class IliasPage:
if self._is_video_player():
log.explain("Page is a video player, extracting URL")
return self._player_to_video()
if self._is_video_listing():
log.explain("Page is a video listing, searching for elements")
return self._find_video_entries()
if self._is_opencast_video_listing():
log.explain("Page is an opencast video listing, searching for elements")
return self._find_opencast_video_entries()
if self._is_exercise_file():
log.explain("Page is an exercise, searching for elements")
return self._find_exercise_entries()
@ -82,20 +122,129 @@ class IliasPage:
if self._is_content_page():
log.explain("Page is a content page, searching for elements")
return self._find_copa_entries()
if self._is_info_tab():
log.explain("Page is info tab, searching for elements")
return self._find_info_tab_entries()
log.explain("Page is a normal folder, searching for elements")
return self._find_normal_entries()
def get_next_stage_element(self) -> Optional[IliasPageElement]:
if self._is_ilias_opencast_embedding():
return self.get_child_elements()[0]
if self._page_type == IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED:
return self._find_video_entries_paginated()[0]
def get_info_tab(self) -> Optional[IliasPageElement]:
tab: Optional[Tag] = self._soup.find(
name="a",
attrs={"href": lambda x: x and "cmdClass=ilinfoscreengui" in x}
)
if tab is not None:
return IliasPageElement(
IliasElementType.INFO_TAB,
self._abs_url_from_link(tab),
"infos"
)
return None
def get_description(self) -> Optional[BeautifulSoup]:
def is_interesting_class(name: str) -> bool:
return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"]
paragraphs: List[Tag] = self._soup.findAll(class_=is_interesting_class)
if not paragraphs:
return None
# Extract bits and pieces into a string and parse it again.
# This ensures we don't miss anything and weird structures are resolved
# somewhat gracefully.
raw_html = ""
for p in paragraphs:
if p.find_parent(class_=is_interesting_class):
continue
# Ignore special listings (like folder groupings)
if "ilc_section_Special" in p["class"]:
continue
raw_html += str(p) + "\n"
raw_html = f"<body>\n{raw_html}\n</body>"
return BeautifulSoup(raw_html, "html.parser")
def get_learning_module_data(self) -> Optional[IliasLearningModulePage]:
if not self._is_learning_module_page():
return None
content = self._soup.select_one("#ilLMPageContent")
title = self._soup.select_one(".ilc_page_title_PageTitle").getText().strip()
return IliasLearningModulePage(
title=title,
content=content,
next_url=self._find_learning_module_next(),
previous_url=self._find_learning_module_prev()
)
def _find_learning_module_next(self) -> Optional[str]:
for link in self._soup.select("a.ilc_page_rnavlink_RightNavigationLink"):
url = self._abs_url_from_link(link)
if "baseClass=ilLMPresentationGUI" not in url:
continue
return url
return None
def _find_learning_module_prev(self) -> Optional[str]:
for link in self._soup.select("a.ilc_page_lnavlink_LeftNavigationLink"):
url = self._abs_url_from_link(link)
if "baseClass=ilLMPresentationGUI" not in url:
continue
return url
return None
def get_download_forum_data(self) -> Optional[IliasDownloadForumData]:
form = self._soup.find("form", attrs={"action": lambda x: x and "fallbackCmd=showThreads" in x})
if not form:
return None
post_url = self._abs_url_from_relative(form["action"])
thread_ids = [f["value"] for f in form.find_all(attrs={"name": "thread_ids[]"})]
form_data: Dict[str, Union[str, List[str]]] = {
"thread_ids[]": thread_ids,
"selected_cmd2": "html",
"select_cmd2": "Ausführen",
"selected_cmd": "",
}
return IliasDownloadForumData(url=post_url, form_data=form_data, empty=len(thread_ids) == 0)
def get_next_stage_element(self) -> Optional[IliasPageElement]:
if self._is_forum_page():
if "trows=800" in self._page_url:
return None
log.explain("Requesting *all* forum threads")
return self._get_show_max_forum_entries_per_page_url()
if self._is_ilias_opencast_embedding():
log.explain("Unwrapping opencast embedding")
return self.get_child_elements()[0]
if self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED:
log.explain("Unwrapping video pagination")
return self._find_opencast_video_entries_paginated()[0]
if self._contains_collapsed_future_meetings():
log.explain("Requesting *all* future meetings")
return self._uncollapse_future_meetings_url()
if not self._is_content_tab_selected():
if self._page_type != IliasElementType.INFO_TAB:
log.explain("Selecting content tab")
return self._select_content_page_url()
else:
log.explain("Crawling info tab, skipping content select")
return None
def _is_forum_page(self) -> bool:
read_more_btn = self._soup.find(
"button",
attrs={"onclick": lambda x: x and "cmdClass=ilobjforumgui&cmd=markAllRead" in x}
)
return read_more_btn is not None
def _is_video_player(self) -> bool:
return "paella_config_file" in str(self._soup)
def _is_video_listing(self) -> bool:
def _is_opencast_video_listing(self) -> bool:
if self._is_ilias_opencast_embedding():
return True
@ -135,6 +284,50 @@ class IliasPage:
return False
return "target=copa_" in link.get("value")
def _is_learning_module_page(self) -> bool:
link = self._soup.find(id="current_perma_link")
if not link:
return False
return "target=pg_" in link.get("value")
def _contains_collapsed_future_meetings(self) -> bool:
return self._uncollapse_future_meetings_url() is not None
def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]:
element = self._soup.find(
"a",
attrs={"href": lambda x: x and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)}
)
if not element:
return None
link = self._abs_url_from_link(element)
return IliasPageElement(IliasElementType.FOLDER, link, "show all meetings")
def _is_content_tab_selected(self) -> bool:
return self._select_content_page_url() is None
def _is_info_tab(self) -> bool:
might_be_info = self._soup.find("form", attrs={"name": lambda x: x == "formInfoScreen"}) is not None
return self._page_type == IliasElementType.INFO_TAB and might_be_info
def _select_content_page_url(self) -> Optional[IliasPageElement]:
tab = self._soup.find(
id="tab_view_content",
attrs={"class": lambda x: x is not None and "active" not in x}
)
# Already selected (or not found)
if not tab:
return None
link = tab.find("a")
if link:
link = self._abs_url_from_link(link)
return IliasPageElement(IliasElementType.FOLDER, link, "select content page")
_unexpected_html_warning()
log.warn_contd(f"Could not find content tab URL on {self._page_url!r}.")
log.warn_contd("PFERD might not find content on the course's main page.")
return None
def _player_to_video(self) -> List[IliasPageElement]:
# Fetch the actual video page. This is a small wrapper page initializing a javscript
# player. Sadly we can not execute that JS. The actual video stream url is nowhere
@ -158,17 +351,30 @@ class IliasPage:
# and just fetch the lone video url!
if len(streams) == 1:
video_url = streams[0]["sources"]["mp4"][0]["src"]
return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)]
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, self._source_name)]
log.explain(f"Found multiple videos for stream at {self._source_name}")
items = []
for stream in sorted(streams, key=lambda stream: stream["content"]):
full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4"
video_url = stream["sources"]["mp4"][0]["src"]
items.append(IliasPageElement(IliasElementType.VIDEO, video_url, full_name))
items.append(IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, full_name))
return items
def _get_show_max_forum_entries_per_page_url(self) -> Optional[IliasPageElement]:
correct_link = self._soup.find(
"a",
attrs={"href": lambda x: x and "trows=800" in x and "cmd=showThreads" in x}
)
if not correct_link:
return None
link = self._abs_url_from_link(correct_link)
return IliasPageElement(IliasElementType.FORUM, link, "show all forum threads")
def _find_personal_desktop_entries(self) -> List[IliasPageElement]:
items: List[IliasPageElement] = []
@ -200,7 +406,8 @@ class IliasPage:
for link in links:
url = self._abs_url_from_link(link)
name = _sanitize_path_name(link.getText().strip().replace("\t", ""))
name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.getText()).strip().replace("\t", "")
name = _sanitize_path_name(name)
if "file_id" not in url:
_unexpected_html_warning()
@ -211,7 +418,24 @@ class IliasPage:
return items
def _find_video_entries(self) -> List[IliasPageElement]:
def _find_info_tab_entries(self) -> List[IliasPageElement]:
items = []
links: List[Tag] = self._soup.select("a.il_ContainerItemCommand")
for link in links:
if "cmdClass=ilobjcoursegui" not in link["href"]:
continue
if "cmd=sendfile" not in link["href"]:
continue
items.append(IliasPageElement(
IliasElementType.FILE,
self._abs_url_from_link(link),
_sanitize_path_name(link.getText())
))
return items
def _find_opencast_video_entries(self) -> List[IliasPageElement]:
# ILIAS has three stages for video pages
# 1. The initial dummy page without any videos. This page contains the link to the listing
# 2. The video listing which might be paginated
@ -231,27 +455,27 @@ class IliasPage:
query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
url = url_set_query_params(url, query_params)
log.explain("Found ILIAS video frame page, fetching actual content next")
return [IliasPageElement(IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED, url, "")]
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, url, "")]
is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None
if is_paginated and not self._page_type == IliasElementType.VIDEO_FOLDER:
if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER:
# We are in stage 2 - try to break pagination
return self._find_video_entries_paginated()
return self._find_opencast_video_entries_paginated()
return self._find_video_entries_no_paging()
return self._find_opencast_video_entries_no_paging()
def _find_video_entries_paginated(self) -> List[IliasPageElement]:
def _find_opencast_video_entries_paginated(self) -> List[IliasPageElement]:
table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+"))
if table_element is None:
log.warn("Couldn't increase elements per page (table not found). I might miss elements.")
return self._find_video_entries_no_paging()
return self._find_opencast_video_entries_no_paging()
id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"])
if id_match is None:
log.warn("Couldn't increase elements per page (table id not found). I might miss elements.")
return self._find_video_entries_no_paging()
return self._find_opencast_video_entries_no_paging()
table_id = id_match.group(1)
@ -260,25 +484,25 @@ class IliasPage:
url = url_set_query_params(self._page_url, query_params)
log.explain("Disabled pagination, retrying folder as a new entry")
return [IliasPageElement(IliasElementType.VIDEO_FOLDER, url, "")]
return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")]
def _find_video_entries_no_paging(self) -> List[IliasPageElement]:
def _find_opencast_video_entries_no_paging(self) -> List[IliasPageElement]:
"""
Crawls the "second stage" video page. This page contains the actual video urls.
"""
# Video start links are marked with an "Abspielen" link
video_links: List[Tag] = self._soup.findAll(
name="a", text=re.compile(r"\s*Abspielen\s*")
name="a", text=re.compile(r"\s*(Abspielen|Play)\s*")
)
results: List[IliasPageElement] = []
for link in video_links:
results.append(self._listed_video_to_element(link))
results.append(self._listed_opencast_video_to_element(link))
return results
def _listed_video_to_element(self, link: Tag) -> IliasPageElement:
def _listed_opencast_video_to_element(self, link: Tag) -> IliasPageElement:
# The link is part of a table with multiple columns, describing metadata.
# 6th or 7th child (1 indexed) is the modification time string. Try to find it
# by parsing backwards from the end and finding something that looks like a date
@ -305,7 +529,9 @@ class IliasPage:
video_url = self._abs_url_from_link(link)
log.explain(f"Found video {video_name!r} at {video_url}")
return IliasPageElement(IliasElementType.VIDEO_PLAYER, video_url, video_name, modification_time)
return IliasPageElement(
IliasElementType.OPENCAST_VIDEO_PLAYER, video_url, video_name, modification_time
)
def _find_exercise_entries(self) -> List[IliasPageElement]:
if self._soup.find(id="tab_submission"):
@ -428,6 +654,12 @@ class IliasPage:
element_type = self._find_type_from_link(element_name, link, abs_url)
description = self._find_link_description(link)
# The last meeting on every page is expanded by default.
# Its content is then shown inline *and* in the meeting page itself.
# We should skip the inline content.
if element_type != IliasElementType.MEETING and self._is_in_expanded_meeting(link):
continue
if not element_type:
continue
if element_type == IliasElementType.MEETING:
@ -442,9 +674,68 @@ class IliasPage:
result.append(IliasPageElement(element_type, abs_url, element_name, description=description))
result += self._find_cards()
result += self._find_mediacast_videos()
return result
def _find_mediacast_videos(self) -> List[IliasPageElement]:
videos: List[IliasPageElement] = []
for elem in cast(List[Tag], self._soup.select(".ilPlayerPreviewOverlayOuter")):
element_name = _sanitize_path_name(
elem.select_one(".ilPlayerPreviewDescription").getText().strip()
)
if not element_name.endswith(".mp4"):
# just to make sure it has some kinda-alrightish ending
element_name = element_name + ".mp4"
video_element = elem.find(name="video")
if not video_element:
_unexpected_html_warning()
log.warn_contd(f"No <video> element found for mediacast video '{element_name}'")
continue
videos.append(IliasPageElement(
type=IliasElementType.MEDIACAST_VIDEO,
url=self._abs_url_from_relative(video_element.get("src")),
name=element_name,
mtime=self._find_mediacast_video_mtime(elem.findParent(name="td"))
))
return videos
def _find_mediacast_video_mtime(self, enclosing_td: Tag) -> Optional[datetime]:
description_td: Tag = enclosing_td.findPreviousSibling("td")
if not description_td:
return None
meta_tag: Tag = description_td.find_all("p")[-1]
if not meta_tag:
return None
updated_str = meta_tag.getText().strip().replace("\n", " ")
updated_str = re.sub(".+?: ", "", updated_str)
return demangle_date(updated_str)
def _is_in_expanded_meeting(self, tag: Tag) -> bool:
"""
Returns whether a file is part of an expanded meeting.
Has false positives for meetings themselves as their title is also "in the expanded meeting content".
It is in the same general div and this whole thing is guesswork.
Therefore, you should check for meetings before passing them in this function.
"""
parents: List[Tag] = list(tag.parents)
for parent in parents:
if not parent.get("class"):
continue
# We should not crawl files under meetings
if "ilContainerListItemContentCB" in parent.get("class"):
link: Tag = parent.parent.find("a")
type = IliasPage._find_type_from_folder_like(link, self._page_url)
return type == IliasElementType.MEETING
return False
def _find_upwards_folder_hierarchy(self, tag: Tag) -> List[str]:
"""
Interprets accordions and expandable blocks as virtual folders and returns them
@ -561,7 +852,11 @@ class IliasPage:
"div",
attrs={"class": lambda x: x and "caption" in x},
)
description = caption_parent.find_next_sibling("div").getText().strip()
caption_container = caption_parent.find_next_sibling("div")
if caption_container:
description = caption_container.getText().strip()
else:
description = None
if not type:
_unexpected_html_warning()
@ -591,8 +886,8 @@ class IliasPage:
icon: Tag = card_root.select_one(".il-card-repository-head .icon")
if "opencast" in icon["class"]:
return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED
if "opencast" in icon["class"] or "xoct" in icon["class"]:
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
if "exc" in icon["class"]:
return IliasElementType.EXERCISE
if "webr" in icon["class"]:
@ -607,6 +902,14 @@ class IliasPage:
return IliasElementType.TEST
if "fold" in icon["class"]:
return IliasElementType.FOLDER
if "copa" in icon["class"]:
return IliasElementType.FOLDER
if "svy" in icon["class"]:
return IliasElementType.SURVEY
if "file" in icon["class"]:
return IliasElementType.FILE
if "mcst" in icon["class"]:
return IliasElementType.MEDIACAST_VIDEO_FOLDER
_unexpected_html_warning()
log.warn_contd(f"Could not extract type from {icon} for card title {card_title}")
@ -645,6 +948,15 @@ class IliasPage:
if "cmdClass=ilobjtestgui" in parsed_url.query:
return IliasElementType.TEST
if "baseClass=ilLMPresentationGUI" in parsed_url.query:
return IliasElementType.LEARNING_MODULE
if "baseClass=ilMediaCastHandlerGUI" in parsed_url.query:
return IliasElementType.MEDIACAST_VIDEO_FOLDER
if "baseClass=ilSAHSPresentationGUI" in parsed_url.query:
return IliasElementType.SCORM_LEARNING_MODULE
# Booking and Meeting can not be detected based on the link. They do have a ref_id though, so
# try to guess it from the image.
@ -686,13 +998,21 @@ class IliasPage:
if img_tag is None:
img_tag = found_parent.select_one("img.icon")
is_session_expansion_button = found_parent.find(
"a",
attrs={"href": lambda x: x and ("crs_next_sess=" in x or "crs_prev_sess=" in x)}
)
if img_tag is None and is_session_expansion_button:
log.explain("Found session expansion button, skipping it as it has no content")
return None
if img_tag is None:
_unexpected_html_warning()
log.warn_contd(f"Tried to figure out element type, but did not find an image for {url}")
return None
if "opencast" in str(img_tag["alt"]).lower():
return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED
return IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED
if str(img_tag["src"]).endswith("icon_exc.svg"):
return IliasElementType.EXERCISE
@ -712,6 +1032,12 @@ class IliasPage:
if str(img_tag["src"]).endswith("icon_tst.svg"):
return IliasElementType.TEST
if str(img_tag["src"]).endswith("icon_mcst.svg"):
return IliasElementType.MEDIACAST_VIDEO_FOLDER
if str(img_tag["src"]).endswith("icon_sahs.svg"):
return IliasElementType.SCORM_LEARNING_MODULE
return IliasElementType.FOLDER
@staticmethod
@ -720,17 +1046,26 @@ class IliasPage:
Normalizes meeting names, which have a relative time as their first part,
to their date in ISO format.
"""
date_portion_str = meeting_name.split(" - ")[0]
# This checks whether we can reach a `:` without passing a `-`
if re.search(r"^[^-]+: ", meeting_name):
# Meeting name only contains date: "05. Jan 2000:"
split_delimiter = ":"
else:
# Meeting name contains date and start/end times: "05. Jan 2000, 16:00 - 17:30:"
split_delimiter = ", "
# We have a meeting day without time
date_portion_str = meeting_name.split(split_delimiter)[0]
date_portion = demangle_date(date_portion_str)
# We failed to parse the date, bail out
if not date_portion:
return meeting_name
rest_of_name = meeting_name
if rest_of_name.startswith(date_portion_str):
rest_of_name = rest_of_name[len(date_portion_str):]
return datetime.strftime(date_portion, "%Y-%m-%d, %H:%M") + rest_of_name
# Replace the first section with the absolute date
rest_of_name = split_delimiter.join(meeting_name.split(split_delimiter)[1:])
return datetime.strftime(date_portion, "%Y-%m-%d") + split_delimiter + rest_of_name
def _abs_url_from_link(self, link_tag: Tag) -> str:
"""
@ -755,35 +1090,47 @@ english_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep',
def demangle_date(date_str: str, fail_silently: bool = False) -> Optional[datetime]:
"""
Demangle a given date in one of the following formats:
Demangle a given date in one of the following formats (hour/minute part is optional):
"Gestern, HH:MM"
"Heute, HH:MM"
"Morgen, HH:MM"
"dd. mon yyyy, HH:MM
"""
try:
# Normalize whitespace because users
date_str = re.sub(r"\s+", " ", date_str)
date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I)
date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I)
date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I)
date_str = date_str.strip()
for german, english in zip(german_months, english_months):
date_str = date_str.replace(german, english)
# Remove trailing dots for abbreviations, e.g. "20. Apr. 2020" -> "20. Apr 2020"
date_str = date_str.replace(english + ".", english)
# We now have a nice english String in the format: "dd. mmm yyyy, hh:mm"
day_part, time_part = date_str.split(",")
# We now have a nice english String in the format: "dd. mmm yyyy, hh:mm" or "dd. mmm yyyy"
# Check if we have a time as well
if ", " in date_str:
day_part, time_part = date_str.split(",")
else:
day_part = date_str.split(",")[0]
time_part = None
day_str, month_str, year_str = day_part.split(" ")
day = int(day_str.strip().replace(".", ""))
month = english_months.index(month_str.strip()) + 1
year = int(year_str.strip())
hour_str, minute_str = time_part.split(":")
hour = int(hour_str)
minute = int(minute_str)
if time_part:
hour_str, minute_str = time_part.split(":")
hour = int(hour_str)
minute = int(minute_str)
return datetime(year, month, day, hour, minute)
return datetime(year, month, day, hour, minute)
return datetime(year, month, day)
except Exception:
if not fail_silently:
log.warn(f"Date parsing failed for {date_str!r}")
@ -805,3 +1152,45 @@ def _tomorrow() -> date:
def _sanitize_path_name(name: str) -> str:
return name.replace("/", "-").replace("\\", "-").strip()
def parse_ilias_forum_export(forum_export: BeautifulSoup) -> List[IliasForumThread]:
elements = []
for p in forum_export.select("body > p"):
title_tag = p
content_tag = p.find_next_sibling("ul")
if not content_tag:
# ILIAS allows users to delete the initial post while keeping the thread open
# This produces empty threads without *any* content.
# I am not sure why you would want this, but ILIAS makes it easy to do.
continue
title = p.find("b").text
if ":" in title:
title = title[title.find(":") + 1:]
title = title.strip()
mtime = _guess_timestamp_from_forum_post_content(content_tag)
elements.append(IliasForumThread(title, title_tag, content_tag, mtime))
return elements
def _guess_timestamp_from_forum_post_content(content: Tag) -> Optional[datetime]:
posts: Optional[Tag] = content.select(".ilFrmPostHeader > span.small")
if not posts:
return None
newest_date: Optional[datetime] = None
for post in posts:
text = post.text.strip()
text = text[text.rfind("|") + 1:]
date = demangle_date(text, fail_silently=True)
if not date:
continue
if not newest_date or newest_date < date:
newest_date = date
return newest_date

View File

@ -1,8 +1,11 @@
import asyncio
import base64
import os
import re
from collections.abc import Awaitable, Coroutine
from pathlib import PurePath
from typing import Any, Callable, Dict, List, Optional, Set, Union, cast
from typing import Any, Callable, Dict, List, Literal, Optional, Set, Union, cast
from urllib.parse import urljoin
import aiohttp
import yarl
@ -16,11 +19,19 @@ from ...output_dir import FileSink, Redownload
from ...utils import fmt_path, soupify, url_set_query_param
from ..crawler import AWrapped, CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical
from ..http_crawler import HttpCrawler, HttpCrawlerSection
from .file_templates import Links
from .kit_ilias_html import IliasElementType, IliasPage, IliasPageElement
from .file_templates import Links, learning_module_template
from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
IliasPageElement, _sanitize_path_name, parse_ilias_forum_export)
TargetType = Union[str, int]
_ILIAS_URL = "https://ilias.studium.kit.edu"
class KitShibbolethBackgroundLoginSuccessful():
pass
class KitIliasWebCrawlerSection(HttpCrawlerSection):
def target(self) -> TargetType:
@ -34,7 +45,7 @@ class KitIliasWebCrawlerSection(HttpCrawlerSection):
if target == "desktop":
# Full personal desktop
return target
if target.startswith("https://ilias.studium.kit.edu"):
if target.startswith(_ILIAS_URL):
# ILIAS URL
return target
@ -66,21 +77,28 @@ class KitIliasWebCrawlerSection(HttpCrawlerSection):
def videos(self) -> bool:
return self.s.getboolean("videos", fallback=False)
def forums(self) -> bool:
return self.s.getboolean("forums", fallback=False)
_DIRECTORY_PAGES: Set[IliasElementType] = set([
IliasElementType.EXERCISE,
IliasElementType.EXERCISE_FILES,
IliasElementType.FOLDER,
IliasElementType.INFO_TAB,
IliasElementType.MEETING,
IliasElementType.VIDEO_FOLDER,
IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED,
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
])
_VIDEO_ELEMENTS: Set[IliasElementType] = set([
IliasElementType.VIDEO,
IliasElementType.VIDEO_PLAYER,
IliasElementType.VIDEO_FOLDER,
IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED,
IliasElementType.MEDIACAST_VIDEO_FOLDER,
IliasElementType.MEDIACAST_VIDEO,
IliasElementType.OPENCAST_VIDEO,
IliasElementType.OPENCAST_VIDEO_PLAYER,
IliasElementType.OPENCAST_VIDEO_FOLDER,
IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED,
])
@ -176,13 +194,14 @@ instance's greatest bottleneck.
section.tfa_auth(authenticators),
)
self._base_url = "https://ilias.studium.kit.edu"
self._base_url = _ILIAS_URL
self._target = section.target()
self._link_file_redirect_delay = section.link_redirect_delay()
self._links = section.links()
self._videos = section.videos()
self._visited_urls: Set[str] = set()
self._forums = section.forums()
self._visited_urls: Dict[str, PurePath] = dict()
async def _run(self) -> None:
if isinstance(self._target, int):
@ -215,27 +234,46 @@ instance's greatest bottleneck.
cl = maybe_cl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608
elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
description: List[BeautifulSoup] = []
@_iorepeat(3, "crawling url")
async def gather_elements() -> None:
elements.clear()
async with cl:
soup = await self._get_page(url)
if expected_id is not None:
perma_link_element: Tag = soup.find(id="current_perma_link")
if not perma_link_element or "crs_" not in perma_link_element.get("value"):
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
next_stage_url: Optional[str] = url
current_parent = None
# Duplicated code, but the root page is special - we want to avoid fetching it twice!
log.explain_topic("Parsing root HTML page")
log.explain(f"URL: {url}")
page = IliasPage(soup, url, None)
while next_stage_url:
soup = await self._get_page(next_stage_url, root_page_allowed=True)
if current_parent is None and expected_id is not None:
perma_link_element: Tag = soup.find(id="current_perma_link")
if not perma_link_element or "crs_" not in perma_link_element.get("value"):
raise CrawlError("Invalid course id? Didn't find anything looking like a course")
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
page = IliasPage(soup, next_stage_url, current_parent)
if next_element := page.get_next_stage_element():
current_parent = next_element
next_stage_url = next_element.url
else:
next_stage_url = None
elements.extend(page.get_child_elements())
if info_tab := page.get_info_tab():
elements.append(info_tab)
if description_string := page.get_description():
description.append(description_string)
# Fill up our task list with the found elements
await gather_elements()
if description:
await self._download_description(PurePath("."), description[0])
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
@ -265,6 +303,8 @@ instance's greatest bottleneck.
cl: CrawlToken,
) -> None:
elements: List[IliasPageElement] = []
# A list as variable redefinitions are not propagated to outer scopes
description: List[BeautifulSoup] = []
@_iorepeat(3, "crawling folder")
async def gather_elements() -> None:
@ -285,10 +325,15 @@ instance's greatest bottleneck.
next_stage_url = None
elements.extend(page.get_child_elements())
if description_string := page.get_description():
description.append(description_string)
# Fill up our task list with the found elements
await gather_elements()
if description:
await self._download_description(cl.path, description[0])
elements.sort(key=lambda e: e.id())
tasks: List[Awaitable[None]] = []
@ -312,42 +357,72 @@ instance's greatest bottleneck.
) -> Optional[Coroutine[Any, Any, None]]:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. Aborting subpath"
f"Found second path to element {element.name!r} at {element.url!r}. "
+ f"First path: {fmt_path(self._visited_urls[element.url])}. "
+ f"Second path: {fmt_path(parent_path)}."
)
self._visited_urls.add(element.url)
self._visited_urls[element.url] = parent_path
element_path = PurePath(parent_path, element.name)
if element.type in _VIDEO_ELEMENTS:
log.explain_topic(f"Decision: Crawl video element {fmt_path(element_path)}")
if not self._videos:
log.explain("Video crawling is disabled")
log.explain("Answer: no")
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](enable with option 'videos')"
)
return None
else:
log.explain("Video crawling is enabled")
log.explain("Answer: yes")
if element.type == IliasElementType.FILE:
return await self._handle_file(element, element_path)
elif element.type == IliasElementType.FORUM:
log.explain_topic(f"Decision: Crawl {fmt_path(element_path)}")
log.explain("Forums are not supported")
log.explain("Answer: No")
return None
if not self._forums:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](enable with option 'forums')"
)
return None
return await self._handle_forum(element, element_path)
elif element.type == IliasElementType.TEST:
log.explain_topic(f"Decision: Crawl {fmt_path(element_path)}")
log.explain("Tests contain no relevant files")
log.explain("Answer: No")
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](tests contain no relevant data)"
)
return None
elif element.type == IliasElementType.SURVEY:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](surveys contain no relevant data)"
)
return None
elif element.type == IliasElementType.SCORM_LEARNING_MODULE:
log.status(
"[bold bright_black]",
"Ignored",
fmt_path(element_path),
"[bright_black](scorm learning modules are not supported)"
)
return None
elif element.type == IliasElementType.LEARNING_MODULE:
return await self._handle_learning_module(element, element_path)
elif element.type == IliasElementType.LINK:
return await self._handle_link(element, element_path)
elif element.type == IliasElementType.BOOKING:
return await self._handle_booking(element, element_path)
elif element.type == IliasElementType.VIDEO:
elif element.type == IliasElementType.OPENCAST_VIDEO:
return await self._handle_file(element, element_path)
elif element.type == IliasElementType.OPENCAST_VIDEO_PLAYER:
return await self._handle_opencast_video(element, element_path)
elif element.type == IliasElementType.MEDIACAST_VIDEO:
return await self._handle_file(element, element_path)
elif element.type == IliasElementType.VIDEO_PLAYER:
return await self._handle_video(element, element_path)
elif element.type in _DIRECTORY_PAGES:
return await self._handle_ilias_page(element.url, element, element_path)
else:
@ -425,6 +500,19 @@ instance's greatest bottleneck.
return self._download_booking(element, link_template_maybe, maybe_dl)
@anoncritical
@_iorepeat(1, "downloading description")
async def _download_description(self, parent_path: PurePath, description: BeautifulSoup) -> None:
path = parent_path / "Description.html"
dl = await self.download(path, redownload=Redownload.ALWAYS)
if not dl:
return
async with dl as (bar, sink):
description = clean(insert_base_markup(description))
sink.file.write(description.prettify().encode("utf-8"))
sink.done()
@anoncritical
@_iorepeat(3, "resolving booking")
async def _download_booking(
@ -451,7 +539,7 @@ instance's greatest bottleneck.
raise CrawlError("resolve_link_target failed even after authenticating")
async def _handle_video(
async def _handle_opencast_video(
self,
element: IliasPageElement,
element_path: PurePath,
@ -472,18 +560,18 @@ instance's greatest bottleneck.
# If we do not want to crawl it (user filter) or we have every file
# from the cached mapping already, we can ignore this and bail
if not maybe_dl or self._all_videos_locally_present(element_path):
if not maybe_dl or self._all_opencast_videos_locally_present(element_path):
# Mark all existing cideos as known so they do not get deleted
# during dleanup. We "downloaded" them, just without actually making
# a network request as we assumed they did not change.
for video in self._previous_contained_videos(element_path):
for video in self._previous_contained_opencast_videos(element_path):
await self.download(video)
return None
return self._download_video(element_path, element, maybe_dl)
return self._download_opencast_video(element_path, element, maybe_dl)
def _previous_contained_videos(self, video_path: PurePath) -> List[PurePath]:
def _previous_contained_opencast_videos(self, video_path: PurePath) -> List[PurePath]:
if not self.prev_report:
return []
custom_value = self.prev_report.get_custom_value(str(video_path))
@ -493,12 +581,12 @@ instance's greatest bottleneck.
folder = video_path.parent
return [PurePath(folder, name) for name in names]
def _all_videos_locally_present(self, video_path: PurePath) -> bool:
if contained_videos := self._previous_contained_videos(video_path):
def _all_opencast_videos_locally_present(self, video_path: PurePath) -> bool:
if contained_videos := self._previous_contained_opencast_videos(video_path):
log.explain_topic(f"Checking local cache for video {video_path.name}")
all_found_locally = True
for video in contained_videos:
transformed_path = self._to_local_video_path(video)
transformed_path = self._to_local_opencast_video_path(video)
if transformed_path:
exists_locally = self._output_dir.resolve(transformed_path).exists()
all_found_locally = all_found_locally and exists_locally
@ -508,14 +596,14 @@ instance's greatest bottleneck.
log.explain("Missing at least one video, continuing with requests!")
return False
def _to_local_video_path(self, path: PurePath) -> Optional[PurePath]:
def _to_local_opencast_video_path(self, path: PurePath) -> Optional[PurePath]:
if transformed := self._transformer.transform(path):
return self._deduplicator.fixup_path(transformed)
return None
@anoncritical
@_iorepeat(3, "downloading video")
async def _download_video(
async def _download_opencast_video(
self,
original_path: PurePath,
element: IliasPageElement,
@ -532,7 +620,7 @@ instance's greatest bottleneck.
log.explain(f"Using single video mode for {element.name}")
stream_element = stream_elements[0]
transformed_path = self._to_local_video_path(original_path)
transformed_path = self._to_local_opencast_video_path(original_path)
if not transformed_path:
raise CrawlError(f"Download returned a path but transform did not for {original_path}")
@ -606,12 +694,208 @@ instance's greatest bottleneck.
if not await try_stream():
raise CrawlError("File streaming failed after authenticate()")
async def _get_page(self, url: str) -> BeautifulSoup:
async def _handle_forum(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(element_path)
if not maybe_cl:
return None
return self._crawl_forum(element, maybe_cl)
@_iorepeat(3, "crawling forum")
@anoncritical
async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None:
elements: List[IliasForumThread] = []
async with cl:
next_stage_url = element.url
while next_stage_url:
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}")
soup = await self._get_page(next_stage_url)
page = IliasPage(soup, next_stage_url, element)
if next := page.get_next_stage_element():
next_stage_url = next.url
else:
break
download_data = page.get_download_forum_data()
if not download_data:
raise CrawlWarning("Failed to extract forum data")
if download_data.empty:
log.explain("Forum had no threads")
elements = []
return
html = await self._post_authenticated(download_data.url, download_data.form_data)
elements = parse_ilias_forum_export(soupify(html))
elements.sort(key=lambda elem: elem.title)
tasks: List[Awaitable[None]] = []
for elem in elements:
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, elem)))
# And execute them
await self.gather(tasks)
@anoncritical
@_iorepeat(3, "saving forum thread")
async def _download_forum_thread(
self,
parent_path: PurePath,
element: IliasForumThread,
) -> None:
path = parent_path / (_sanitize_path_name(element.title) + ".html")
maybe_dl = await self.download(path, mtime=element.mtime)
if not maybe_dl:
return
async with maybe_dl as (bar, sink):
content = element.title_tag.prettify()
content += element.content_tag.prettify()
sink.file.write(content.encode("utf-8"))
sink.done()
async def _handle_learning_module(
self,
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]:
maybe_cl = await self.crawl(element_path)
if not maybe_cl:
return None
return self._crawl_learning_module(element, maybe_cl)
@_iorepeat(3, "crawling learning module")
@anoncritical
async def _crawl_learning_module(self, element: IliasPageElement, cl: CrawlToken) -> None:
elements: List[IliasLearningModulePage] = []
async with cl:
log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}")
log.explain(f"URL: {element.url}")
soup = await self._get_page(element.url)
page = IliasPage(soup, element.url, element)
if next := page.get_learning_module_data():
elements.extend(await self._crawl_learning_module_direction(
cl.path, next.previous_url, "left", element
))
elements.append(next)
elements.extend(await self._crawl_learning_module_direction(
cl.path, next.next_url, "right", element
))
# Reflect their natural ordering in the file names
for index, lm_element in enumerate(elements):
lm_element.title = f"{index:02}_{lm_element.title}"
tasks: List[Awaitable[None]] = []
for index, elem in enumerate(elements):
prev_url = elements[index - 1].title if index > 0 else None
next_url = elements[index + 1].title if index < len(elements) - 1 else None
tasks.append(asyncio.create_task(
self._download_learning_module_page(cl.path, elem, prev_url, next_url)
))
# And execute them
await self.gather(tasks)
async def _crawl_learning_module_direction(
self,
path: PurePath,
start_url: Optional[str],
dir: Union[Literal["left"], Literal["right"]],
parent_element: IliasPageElement
) -> List[IliasLearningModulePage]:
elements: List[IliasLearningModulePage] = []
if not start_url:
return elements
next_element_url: Optional[str] = start_url
counter = 0
while next_element_url:
log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})")
log.explain(f"URL: {next_element_url}")
soup = await self._get_page(next_element_url)
page = IliasPage(soup, next_element_url, parent_element)
if next := page.get_learning_module_data():
elements.append(next)
if dir == "left":
next_element_url = next.previous_url
else:
next_element_url = next.next_url
counter += 1
return elements
@anoncritical
@_iorepeat(3, "saving learning module page")
async def _download_learning_module_page(
self,
parent_path: PurePath,
element: IliasLearningModulePage,
prev: Optional[str],
next: Optional[str]
) -> None:
path = parent_path / (_sanitize_path_name(element.title) + ".html")
maybe_dl = await self.download(path)
if not maybe_dl:
return
my_path = self._transformer.transform(maybe_dl.path)
if not my_path:
return
if prev:
prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html"))
if prev_p:
prev = os.path.relpath(prev_p, my_path.parent)
else:
prev = None
if next:
next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html"))
if next_p:
next = os.path.relpath(next_p, my_path.parent)
else:
next = None
async with maybe_dl as (bar, sink):
content = element.content
content = await self.internalize_images(content)
sink.file.write(learning_module_template(content, maybe_dl.path.name, prev, next).encode("utf-8"))
sink.done()
async def internalize_images(self, tag: Tag) -> Tag:
"""
Tries to fetch ILIAS images and embed them as base64 data.
"""
log.explain_topic("Internalizing images")
for elem in tag.find_all(recursive=True):
if not isinstance(elem, Tag):
continue
if elem.name == "img":
if src := elem.attrs.get("src", None):
url = urljoin(_ILIAS_URL, src)
if not url.startswith(_ILIAS_URL):
continue
log.explain(f"Internalizing {url!r}")
img = await self._get_authenticated(url)
elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode()
if elem.name == "iframe" and elem.attrs.get("src", "").startswith("//"):
# For unknown reasons the protocol seems to be stripped.
elem.attrs["src"] = "https:" + elem.attrs["src"]
return tag
async def _get_page(self, url: str, root_page_allowed: bool = False) -> BeautifulSoup:
auth_id = await self._current_auth_id()
async with self.session.get(url) as request:
soup = soupify(await request.read())
if self._is_logged_in(soup):
return soup
return self._verify_page(soup, url, root_page_allowed)
# We weren't authenticated, so try to do that
await self.authenticate(auth_id)
@ -620,21 +904,73 @@ instance's greatest bottleneck.
async with self.session.get(url) as request:
soup = soupify(await request.read())
if self._is_logged_in(soup):
return soup
raise CrawlError("get_page failed even after authenticating")
return self._verify_page(soup, url, root_page_allowed)
raise CrawlError(f"get_page failed even after authenticating on {url!r}")
def _verify_page(self, soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup:
if IliasPage.is_root_page(soup) and not root_page_allowed:
raise CrawlError(
"Unexpectedly encountered ILIAS root page. "
"This usually happens because the ILIAS instance is broken. "
"If so, wait a day or two and try again. "
"It could also happen because a crawled element links to the ILIAS root page. "
"If so, use a transform with a ! as target to ignore the particular element. "
f"The redirect came from {url}"
)
return soup
async def _post_authenticated(
self,
url: str,
data: dict[str, Union[str, List[str]]]
) -> bytes:
auth_id = await self._current_auth_id()
form_data = aiohttp.FormData()
for key, val in data.items():
form_data.add_field(key, val)
async with self.session.post(url, data=form_data(), allow_redirects=False) as request:
if request.status == 200:
return await request.read()
# We weren't authenticated, so try to do that
await self.authenticate(auth_id)
# Retry once after authenticating. If this fails, we will die.
async with self.session.post(url, data=data, allow_redirects=False) as request:
if request.status == 200:
return await request.read()
raise CrawlError("post_authenticated failed even after authenticating")
async def _get_authenticated(self, url: str) -> bytes:
auth_id = await self._current_auth_id()
async with self.session.get(url, allow_redirects=False) as request:
if request.status == 200:
return await request.read()
# We weren't authenticated, so try to do that
await self.authenticate(auth_id)
# Retry once after authenticating. If this fails, we will die.
async with self.session.get(url, allow_redirects=False) as request:
if request.status == 200:
return await request.read()
raise CrawlError("get_authenticated failed even after authenticating")
# We repeat this as the login method in shibboleth doesn't handle I/O errors.
# Shibboleth is quite reliable as well, the repeat is likely not critical here.
@_iorepeat(3, "Login", failure_is_error=True)
@ _iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session)
@staticmethod
@ staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find("button", attrs={"data-action": lambda x: x and "login.php" in x})
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
@ -679,14 +1015,17 @@ class KitShibbolethLogin:
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
url = "https://ilias.studium.kit.edu/shib_login.php"
url = f"{_ILIAS_URL}/shib_login.php"
data = {
"sendLogin": "1",
"idp_selection": "https://idp.scc.kit.edu/idp/shibboleth",
"il_target": "",
"home_organization_selection": "Weiter",
}
soup: BeautifulSoup = await _shib_post(sess, url, data)
soup: Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful] = await _shib_post(sess, url, data)
if isinstance(soup, KitShibbolethBackgroundLoginSuccessful):
return
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
@ -725,7 +1064,7 @@ class KitShibbolethLogin:
# (or clicking "Continue" if you have JS disabled)
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = "https://ilias.studium.kit.edu/Shibboleth.sso/SAML2/POST"
url = f"{_ILIAS_URL}/Shibboleth.sso/SAML2/POST"
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
@ -774,22 +1113,35 @@ async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> Beautifu
return soupify(await response.read())
async def _shib_post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async def _shib_post(
session: aiohttp.ClientSession,
url: str,
data: Any
) -> Union[BeautifulSoup, KitShibbolethBackgroundLoginSuccessful]:
"""
aiohttp unescapes '/' and ':' in URL query parameters which is not RFC compliant and rejected
by Shibboleth. Thanks a lot. So now we unroll the requests manually, parse location headers and
build encoded URL objects ourselves... Who thought mangling location header was a good idea??
"""
log.explain_topic("Shib login POST")
async with session.post(url, data=data, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Got location {location!r}")
if not location:
raise CrawlWarning(f"Login failed (1), no location header present at {url}")
correct_url = yarl.URL(location, encoded=True)
log.explain(f"Corrected location to {correct_url!r}")
if str(correct_url).startswith(_ILIAS_URL):
log.explain("ILIAS recognized our shib token and logged us in in the background, returning")
return KitShibbolethBackgroundLoginSuccessful()
async with session.get(correct_url, allow_redirects=False) as response:
location = response.headers.get("location")
log.explain(f"Redirected to {location!r} with status {response.status}")
# If shib still still has a valid session, it will directly respond to the request
if location is None:
log.explain("Shib recognized us, returning its response directly")
return soupify(await response.read())
as_yarl = yarl.URL(response.url)
@ -803,6 +1155,7 @@ async def _shib_post(session: aiohttp.ClientSession, url: str, data: Any) -> Bea
path=location,
encoded=True
)
log.explain(f"Corrected location to {correct_url!r}")
async with session.get(correct_url, allow_redirects=False) as response:
return soupify(await response.read())

View File

@ -2,7 +2,7 @@ import os
import re
from dataclasses import dataclass
from pathlib import PurePath
from typing import Awaitable, List, Optional, Pattern, Set, Union
from typing import Awaitable, List, Optional, Pattern, Set, Tuple, Union
from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag
@ -27,7 +27,7 @@ class KitIpdCrawlerSection(HttpCrawlerSection):
return target
def link_regex(self) -> Pattern[str]:
regex = self.s.get("link_regex", r"^.*/[^/]*\.(?:pdf|zip|c|java)$")
regex = self.s.get("link_regex", r"^.*?[^/]+\.(pdf|zip|c|cpp|java)$")
return re.compile(regex)
@ -45,7 +45,7 @@ class KitIpdFolder:
def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}")
for file in self.files:
log.explain(f"File {file.name!r}")
log.explain(f"File {file.name!r} (href={file.url!r})")
def __hash__(self) -> int:
return self.name.__hash__()
@ -99,32 +99,32 @@ class KitIpdCrawler(HttpCrawler):
await self._stream_from_url(file.url, sink, bar)
async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]:
page = await self.get_page()
page, url = await self.get_page()
elements: List[Tag] = self._find_file_links(page)
items: Set[Union[KitIpdFile, KitIpdFolder]] = set()
for element in elements:
folder_label = self._find_folder_label(element)
if folder_label:
folder = self._extract_folder(folder_label)
folder = self._extract_folder(folder_label, url)
if folder not in items:
items.add(folder)
folder.explain()
else:
file = self._extract_file(element)
file = self._extract_file(element, url)
items.add(file)
log.explain_topic(f"Orphan file {file.name!r}")
log.explain_topic(f"Orphan file {file.name!r} (href={file.url!r})")
log.explain("Attributing it to root folder")
return items
def _extract_folder(self, folder_tag: Tag) -> KitIpdFolder:
def _extract_folder(self, folder_tag: Tag, url: str) -> KitIpdFolder:
files: List[KitIpdFile] = []
name = folder_tag.getText().strip()
container: Tag = folder_tag.findNextSibling(name="table")
for link in self._find_file_links(container):
files.append(self._extract_file(link))
files.append(self._extract_file(link, url))
return KitIpdFolder(name, files)
@ -135,16 +135,16 @@ class KitIpdCrawler(HttpCrawler):
return None
return enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$"))
def _extract_file(self, link: Tag) -> KitIpdFile:
url = self._abs_url_from_link(link)
def _extract_file(self, link: Tag, url: str) -> KitIpdFile:
url = self._abs_url_from_link(url, link)
name = os.path.basename(url)
return KitIpdFile(name, url)
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> List[Tag]:
return tag.findAll(name="a", attrs={"href": self._file_regex})
def _abs_url_from_link(self, link_tag: Tag) -> str:
return urljoin(self._url, link_tag.get("href"))
def _abs_url_from_link(self, url: str, link_tag: Tag) -> str:
return urljoin(url, link_tag.get("href"))
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp:
@ -159,6 +159,12 @@ class KitIpdCrawler(HttpCrawler):
sink.done()
async def get_page(self) -> BeautifulSoup:
async def get_page(self) -> Tuple[BeautifulSoup, str]:
async with self.session.get(self._url) as request:
return soupify(await request.read())
# The web page for Algorithmen für Routenplanung contains some
# weird comments that beautifulsoup doesn't parse correctly. This
# hack enables those pages to be crawled, and should hopefully not
# cause issues on other pages.
content = (await request.read()).decode("utf-8")
content = re.sub(r"<!--.*?-->", "", content)
return soupify(content.encode("utf-8")), str(request.url)

View File

@ -14,7 +14,7 @@ def name_variants(path: PurePath) -> Iterator[PurePath]:
class Deduplicator:
FORBIDDEN_CHARS = '<>:"/\\|?*'
FORBIDDEN_CHARS = '<>:"/\\|?*' + "".join([chr(i) for i in range(0, 32)])
FORBIDDEN_NAMES = {
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",

View File

@ -59,6 +59,7 @@ class Log:
# Whether different parts of the output are enabled or disabled
self.output_explain = False
self.output_status = True
self.output_not_deleted = True
self.output_report = True
def _update_live(self) -> None:
@ -197,7 +198,7 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
if self.output_explain:
self.print(f" {escape(text)}")
def status(self, style: str, action: str, text: str) -> None:
def status(self, style: str, action: str, text: str, suffix: str = "") -> None:
"""
Print a status update while crawling. Allows markup in the "style"
argument which will be applied to the "action" string.
@ -205,7 +206,18 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
if self.output_status:
action = escape(f"{action:<{self.STATUS_WIDTH}}")
self.print(f"{style}{action}[/] {escape(text)}")
self.print(f"{style}{action}[/] {escape(text)} {suffix}")
def not_deleted(self, style: str, action: str, text: str, suffix: str = "") -> None:
"""
Print a message for a local only file that wasn't
deleted while crawling. Allows markup in the "style"
argument which will be applied to the "action" string.
"""
if self.output_status and self.output_not_deleted:
action = escape(f"{action:<{self.STATUS_WIDTH}}")
self.print(f"{style}{action}[/] {escape(text)} {suffix}")
def report(self, text: str) -> None:
"""
@ -215,6 +227,14 @@ directly or as a GitHub issue: https://github.com/Garmelon/PFERD/issues/new
if self.output_report:
self.print(text)
def report_not_deleted(self, text: str) -> None:
"""
Print a report for a local only file that wasn't deleted after crawling. Allows markup.
"""
if self.output_report and self.output_not_deleted:
self.print(text)
@contextmanager
def _bar(
self,

View File

@ -44,6 +44,7 @@ class OnConflict(Enum):
LOCAL_FIRST = "local-first"
REMOTE_FIRST = "remote-first"
NO_DELETE = "no-delete"
NO_DELETE_PROMPT_OVERWRITE = "no-delete-prompt-overwrite"
@staticmethod
def from_string(string: str) -> "OnConflict":
@ -51,7 +52,7 @@ class OnConflict(Enum):
return OnConflict(string)
except ValueError:
raise ValueError("must be one of 'prompt', 'local-first',"
" 'remote-first', 'no-delete'")
" 'remote-first', 'no-delete', 'no-delete-prompt-overwrite'")
@dataclass
@ -264,7 +265,7 @@ class OutputDirectory:
on_conflict: OnConflict,
path: PurePath,
) -> bool:
if on_conflict == OnConflict.PROMPT:
if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
async with log.exclusive_output():
prompt = f"Replace {fmt_path(path)} with remote file?"
return await prompt_yes_no(prompt, default=False)
@ -283,7 +284,7 @@ class OutputDirectory:
on_conflict: OnConflict,
path: PurePath,
) -> bool:
if on_conflict == OnConflict.PROMPT:
if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
async with log.exclusive_output():
prompt = f"Recursively delete {fmt_path(path)} and replace with remote file?"
return await prompt_yes_no(prompt, default=False)
@ -303,7 +304,7 @@ class OutputDirectory:
path: PurePath,
parent: PurePath,
) -> bool:
if on_conflict == OnConflict.PROMPT:
if on_conflict in {OnConflict.PROMPT, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
async with log.exclusive_output():
prompt = f"Delete {fmt_path(parent)} so remote file {fmt_path(path)} can be downloaded?"
return await prompt_yes_no(prompt, default=False)
@ -330,7 +331,7 @@ class OutputDirectory:
return False
elif on_conflict == OnConflict.REMOTE_FIRST:
return True
elif on_conflict == OnConflict.NO_DELETE:
elif on_conflict in {OnConflict.NO_DELETE, OnConflict.NO_DELETE_PROMPT_OVERWRITE}:
return False
# This should never be reached
@ -495,7 +496,7 @@ class OutputDirectory:
except OSError:
pass
else:
log.status("[bold bright_magenta]", "Not deleted", fmt_path(pure))
log.not_deleted("[bold bright_magenta]", "Not deleted", fmt_path(pure))
self._report.not_delete_file(pure)
def load_prev_report(self) -> None:

View File

@ -1,5 +1,5 @@
from pathlib import Path
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Set
from rich.markup import escape
@ -43,16 +43,24 @@ class Pferd:
crawl_sections = [name for name, _ in config.crawl_sections()]
crawlers_to_run = [] # With crawl: prefix
crawlers_to_run = set() # With crawl: prefix
unknown_names = [] # Without crawl: prefix
for name in cli_crawlers:
section_name = f"crawl:{name}"
if section_name in crawl_sections:
log.explain(f"Crawler section named {section_name!r} exists")
crawlers_to_run.append(section_name)
else:
log.explain(f"There's no crawler section named {section_name!r}")
crawlers_to_run.add(section_name)
# interprete name as alias of a crawler
alias_names = self._find_crawlers_by_alias(name, config)
if alias_names:
crawlers_to_run.update(alias_names)
log.explain_topic(f"Crawler alias {name!r} found corresponding crawler sections:")
for alias_name in alias_names:
log.explain(f"Crawler section named {alias_name!r} with alias {name!r} exists")
if not section_name in crawl_sections and not alias_names:
log.explain(f"There's neither a crawler section named {section_name!r} nor does a crawler with alias {name!r} exist.")
unknown_names.append(name)
if unknown_names:
@ -65,6 +73,14 @@ class Pferd:
return crawlers_to_run
def _find_crawlers_by_alias(self, alias: str, config: Config) -> Set[str]:
alias_names = set()
for (section_name, section) in config.crawl_sections():
section_aliases = section.get("aliases", [])
if alias in section_aliases:
alias_names.add(section_name)
return alias_names
def _find_crawlers_to_run(
self,
config: Config,
@ -180,7 +196,7 @@ class Pferd:
log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}")
for path in sorted(crawler.report.not_deleted_files):
something_changed = True
log.report(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
log.report_not_deleted(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
for warning in crawler.report.encountered_warnings:
something_changed = True

View File

@ -1,2 +1,2 @@
NAME = "PFERD"
VERSION = "3.4.0"
VERSION = "3.5.0"

View File

@ -30,7 +30,10 @@ The use of [venv](https://docs.python.org/3/library/venv.html) is recommended.
Unofficial packages are available for:
- [AUR](https://aur.archlinux.org/packages/pferd)
- [brew](https://formulae.brew.sh/formula/pferd)
- [conda-forge](https://github.com/conda-forge/pferd-feedstock)
- [nixpkgs](https://github.com/NixOS/nixpkgs/blob/master/pkgs/tools/misc/pferd/default.nix)
- [PyPi](https://pypi.org/project/pferd)
See also PFERD's [repology page](https://repology.org/project/pferd/versions).

27
flake.lock generated Normal file
View File

@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1694499547,
"narHash": "sha256-R7xMz1Iia6JthWRHDn36s/E248WB1/je62ovC/dUVKI=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "e5f018cf150e29aac26c61dac0790ea023c46b24",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.05",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

41
flake.nix Normal file
View File

@ -0,0 +1,41 @@
{
description = "Tool for downloading course-related files from ILIAS";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-23.05";
};
outputs = { self, nixpkgs }:
let
# Helper function to generate an attrset '{ x86_64-linux = f "x86_64-linux"; ... }'.
forAllSystems = nixpkgs.lib.genAttrs nixpkgs.lib.systems.flakeExposed;
in
{
packages = forAllSystems (system:
let pkgs = import nixpkgs { inherit system; };
in
rec {
default = pkgs.python3Packages.buildPythonApplication rec {
pname = "pferd";
# Performing black magic
# Don't worry, I sacrificed enough goats for the next few years
version = (pkgs.lib.importTOML ./PFERD/version.py).VERSION;
format = "pyproject";
src = ./.;
nativeBuildInputs = with pkgs.python3Packages; [
setuptools
];
propagatedBuildInputs = with pkgs.python3Packages; [
aiohttp
beautifulsoup4
rich
keyring
certifi
];
};
});
};
}