Compare commits

..

160 Commits

Author SHA1 Message Date
41cbcc509c Update version to 2.2.0 2020-07-15 22:47:44 +02:00
77874b432b Also add personal_desktop to download summary 2020-07-15 22:47:44 +02:00
5c4c785e60 Fix HTML file downloading
Previously PFERD thought any HTML file was a "Error, no access" page
when downloading. Now it checks whether ILIAS sends a
content-disposition header, telling the browser to download the file. If
that is the case, it was just a HTML file uploaded to ILIAS. If it has
no header, it is probably an error message.
2020-07-15 15:12:14 +02:00
2aed4f6d1f Only query the dir_filter for directories 2020-07-13 13:36:12 +02:00
34152fbe54 Set mtime and atime to ILIAS dates where possible 2020-07-13 13:29:18 +02:00
4047fe78f3 Fix README formatting 2020-07-11 18:22:33 +00:00
c28347122e Improve README
- Added a table of contents
- Reworked the transform section
- Fixed the commented example
2020-07-11 18:16:33 +00:00
5b38ab8cf1 Add MIT license 2020-07-08 09:46:27 +00:00
bb25d32f03 Fix typo in README 2020-06-29 16:18:33 +02:00
ecaedea709 Merge pull request #8 from pavelzw/master
Fix version number
2020-06-26 17:52:05 +02:00
f05d1b1261 Fix version number 2020-06-26 17:49:47 +02:00
6aaa3071f9 Update README with new version 2020-06-26 17:35:03 +02:00
c26c9352f1 Make DownloadSummary private, provide property accessors 2020-06-26 17:30:45 +02:00
d9ea688145 Use pretty logger for summaries 2020-06-26 17:24:36 +02:00
e8be6e498e Add summary to example_config_personal_desktop 2020-06-26 17:24:36 +02:00
e4b1fac045 Satisfy pylint 2020-06-26 15:38:22 +02:00
402ae81335 Fix type hints 2020-06-26 13:17:44 +00:00
52f31e2783 Add type hints to DownloadSummary 2020-06-26 13:02:37 +02:00
739522a151 Move download summary into a separate class 2020-06-25 23:07:11 +02:00
6c034209b6 Add deleted files to summary 2020-06-25 22:00:28 +02:00
f6fbd5e4bb Add download summary 2020-06-25 19:19:34 +02:00
7024db1f13 Use transient progessbar
This will ensure no pesky newline ends up in the output, even on
windows.
2020-06-25 18:03:12 +02:00
23bfa42a0d Never use the direct download button, as it is currently broken 2020-06-11 13:31:01 +02:00
fdb57884ed Touch files with same content to update timestamps 2020-05-31 20:27:15 +02:00
f614b95a00 Adjust version in setup.py 2020-05-30 19:07:02 +02:00
8198c9ecaa Reorder methods a bit 2020-05-30 19:06:36 +02:00
086b15d10f Crawl a bit more iteratively 2020-05-30 15:47:15 +02:00
9d6ce331a5 Use IliasCrawlerEntry entries in the ilias scraper 2020-05-30 15:20:51 +02:00
821c7ade26 Move video url extraction logic to crawler 2020-05-30 00:22:31 +02:00
b969a1854a Remove unneeded whitespace 2020-05-30 00:22:31 +02:00
62535b4452 Unpack videos in ILIAS downloader 2020-05-21 22:12:52 +02:00
c0056e5669 Correctly crawl video pages with multiple pages 2020-05-21 21:38:07 +02:00
cfe4a8fc0a Bump version to 2.0.0 2020-05-15 11:26:23 +00:00
95b9248a25 Clean up 2020-05-15 11:26:09 +00:00
1004fa40f8 Add personal desktop example config to README 2020-05-15 11:02:55 +02:00
e8ddb0ca04 Fix example config link in README 2020-05-15 11:02:45 +02:00
36c8785f15 Add example config that synchronizes the personal desktop 2020-05-15 11:02:13 +02:00
03a801eecc Correctly type hint swallow_and_print_errors decorator 2020-05-12 21:03:53 +02:00
072c6630bf Avoid logging import in config 2020-05-12 18:19:23 +00:00
4f56c8f192 Pass element type to ilias directory filter 2020-05-12 14:41:13 +02:00
4fdb67128d Fetch correct diva playlist id 2020-05-11 00:25:34 +02:00
a0f9d31d94 Use PrettyLogger warning everywhere 2020-05-10 21:56:12 +02:00
e7b08420ba Warn when a marked file is added again 2020-05-10 21:42:30 +02:00
c1b21f7772 Only remove a progress task when we added it 2020-05-10 12:28:30 +02:00
9850ab1d73 Allow crawling the ILIAS Personal Desktop 2020-05-10 12:16:42 +02:00
9950144e97 Allow passing a playlist URL to diva instead of an id 2020-05-10 11:17:13 +02:00
f6faacabb0 Move FatalException to errors.py 2020-05-09 00:11:21 +02:00
19c1e3ac6f Fail on invalid ILIAS course ids 2020-05-09 00:11:20 +02:00
afa48c2d2d Swallow and print errors instead of crashing 2020-05-09 00:10:54 +02:00
a4c518bf4c Update date find regex 2020-05-08 22:17:58 +02:00
057135022f Try to accept that life sometimes is in English 2020-05-08 22:10:43 +02:00
755e9aa0d3 Try to add support for Shibboleth TFA token 2020-05-08 21:52:51 +02:00
c9deca19ca Remove walrus to lower needed python version 2020-05-08 21:21:33 +02:00
bb048c3a6d Apparently we want Python 3.8 2020-05-08 21:04:13 +02:00
33fcd307b2 Adjust install directions 2020-05-08 20:53:41 +02:00
a0c5572b59 Fix progress bars swallowing a line when they shouldn't 2020-05-08 19:55:53 +02:00
2d20d2934c Color warning differently 2020-05-08 19:52:45 +02:00
2c48ab66d4 Use rich for log colorization 2020-05-08 19:31:54 +02:00
104b838aed Automatically discover packages in setup.py 2020-05-08 18:58:44 +02:00
7f10931be8 Add rich to setup.py 2020-05-08 18:49:36 +02:00
07c225bc20 Expand README. I did not proofread this thing :( 2020-05-08 18:47:58 +02:00
56f2394001 Add a download progress bar 2020-05-08 17:09:56 +02:00
fdff8bc40e example_config: Change db transform 2020-05-01 13:31:29 +02:00
bee3d70998 Added a diva playlist downloader 2020-04-30 17:18:45 +02:00
42345ecc61 Demangle "Morgen" too 2020-04-30 12:05:25 +02:00
920d521d68 Change PrettyLogger.warn to PrettyLogger.warning 2020-04-25 20:11:51 +02:00
e0b46a306a Use warn method in IliasCrawler 2020-04-25 20:07:40 +02:00
8a42a2a396 Move logging into its own file 2020-04-25 20:02:01 +02:00
80247400a4 Debug log when starting an ilias download 2020-04-25 13:02:07 +02:00
13c5a29ff0 Fix and extend example config 2020-04-24 18:41:22 +00:00
1aaa6e7ab5 Use PathLike everywhere 2020-04-24 18:41:14 +00:00
7f53543324 Satisfy pylint and add todo 2020-04-24 18:26:28 +00:00
292e516297 Change crawler and downloader output 2020-04-24 18:24:44 +00:00
8258fa8919 Add test run option to PFERD 2020-04-24 18:00:21 +00:00
5b929f09a2 Move download strategies to downloader
Also fixes an issue where the downloader didn't mark files that were not
downloaded due to the strategy used.
2020-04-24 14:27:40 +00:00
4d32f863bc Clean up organizer after synchronizing 2020-04-24 14:17:23 +00:00
4e7333b396 Allow specifying paths as strings in Pferd 2020-04-24 11:50:40 +00:00
4c0e3b493a Use download_modified_or_new as default strategy 2020-04-24 13:48:06 +02:00
2de079a5d3 Add a few Transform combinators 2020-04-24 11:35:46 +00:00
509e624d47 Satisfy pyling. Useful docstrings? Not quite sure. 2020-04-23 20:35:59 +02:00
ca8fcf7a1d Somewhat elaborate example_config 2020-04-23 20:22:41 +02:00
980f69b5af Fix organizer marking itself causing an error 2020-04-23 20:02:05 +02:00
0b00a9c26b Log when starting to synchronize 2020-04-23 19:56:37 +02:00
1ef85c45e5 Switch Transform to PurePath 2020-04-23 17:40:43 +00:00
5ef5a56e69 Extract Location into separate file 2020-04-23 17:38:28 +00:00
f3f4be2690 More free functions 2020-04-23 19:21:49 +02:00
076b8c5a1f Add download strategies to save bandwith
Only download files that are newer than the local version.
2020-04-23 18:29:20 +02:00
13bc78c889 Display reason for ignoring an element in ilias crawler 2020-04-23 13:54:58 +02:00
dc964a9d98 Remove finished TODOs 2020-04-23 13:30:34 +02:00
c2b14f3db9 ilias crawler: Use direct download link if possible 2020-04-23 13:08:12 +02:00
4b59a7c375 Move around TODOs 2020-04-23 10:49:01 +00:00
3a57430893 Fix type errors in example_config 2020-04-23 12:35:58 +02:00
bef210ae77 Rename and implement IliasDirectoryFilter 2020-04-23 12:35:18 +02:00
ea005517cf Only remove folders if they exist in tmpdir 2020-04-23 12:09:45 +02:00
3841f27aab Add example config 2020-04-23 09:50:32 +00:00
df0eb84a44 Fix TmpDir and Location
TmpDir: Clean up before and after, not just after
Location: Resolve path so that parent check works properly
2020-04-23 09:50:32 +00:00
2de4255a78 Add Pferd class 2020-04-23 09:50:32 +00:00
3c808879c9 Add Transforms and Transformables 2020-04-22 18:25:09 +00:00
a051e3bcca ilias crawler: Add some unhelpful documentation 2020-04-22 17:58:19 +02:00
eb7df036df WIP: ilias crawler: Also crawl assignments 2020-04-22 14:32:20 +02:00
23db59e733 WIP: ilias-crawler: Demangle dates 2020-04-22 12:58:44 +02:00
ac65b06a8e Satisfy pylint a bit 2020-04-22 01:37:34 +02:00
8891041069 WIP: crawler: Add opencast video crawler 2020-04-21 23:01:19 +02:00
70d63e3e90 WIP: Start small ILIAS crawler 2020-04-21 13:32:03 +02:00
b2a7af2e3e Store modification_date in IliasDownloadInfo, remove parameters 2020-04-21 13:31:50 +02:00
23bed48c8c Satisfy autopep8 2020-04-21 13:30:42 +02:00
0926d33798 Use downloader-specific data classes 2020-04-20 18:07:45 +00:00
55ba2f4070 Fix pylint in downloaders 2020-04-20 19:49:15 +02:00
d18b48aaf4 Stream in http downloader 2020-04-20 19:45:25 +02:00
4ef0ffe3bf Listen to pylint and mypy 2020-04-20 17:44:58 +00:00
ce77995c8f Rename http downloader module 2020-04-20 17:08:51 +00:00
ed9245c14d Remove old organizer 2020-04-20 18:50:23 +02:00
01e6972c96 Add ilias downloader 2020-04-20 18:49:01 +02:00
8181ae5b17 Guard http response in context manager 2020-04-20 18:47:46 +02:00
6407190ae0 Soupify requests responses properly 2020-04-20 16:38:30 +00:00
87395faac2 Add base for simple HTTP downloader 2020-04-20 17:43:59 +02:00
a9e6e7883d Create temp dir folder in constructor 2020-04-20 17:43:59 +02:00
154d6b29dd Listen to pylint 2020-04-20 15:16:22 +00:00
62ac569ec4 Revert "Add proposed crawler entry type"
This reverts commit 9f1a0a58ab.

Each crawler will have its own data class.
2020-04-20 16:59:20 +02:00
9f1a0a58ab Add proposed crawler entry type 2020-04-20 16:54:47 +02:00
879a2c7c80 Rewrite ILIAS authenticator 2020-04-20 14:26:30 +00:00
ff06c5215e Fix authenticator 2020-04-20 14:26:29 +00:00
135a8dce4b Fix resolve_path allowing paths outside its folder
This happened if the directory name was a prefix of the offending file name.
2020-04-20 16:07:14 +02:00
63bbcad918 Add resolve method to tmp_dir 2020-04-20 15:40:07 +02:00
6584d6a905 Elaborate accept_file in new_organizer 2020-04-20 15:40:07 +02:00
5990098ef8 Add UserPassAuthenticator 2020-04-20 13:26:45 +00:00
f3d3d6bb65 Add some docs to cookie_jar 2020-04-20 14:38:03 +02:00
b2fe7cc064 Add preliminary logging to organizer and tmp_dir 2020-04-20 14:37:44 +02:00
930d821dd7 Add a simple organizer 2020-04-20 14:29:48 +02:00
5c2ff14839 Add "prompt_yes_no" to utils 2020-04-20 14:29:48 +02:00
a3d6dc7873 Clean up temp_folder 2020-04-20 14:29:48 +02:00
53ad1c924b Add cookie jar 2020-04-20 11:35:26 +00:00
8c431c7d81 Add a simple temporary folder 2020-04-20 12:08:52 +02:00
d5dd5aac06 Fix some mypy errors 2020-04-20 01:54:47 +00:00
7d48972967 Configure mypy 2020-04-19 19:50:17 +00:00
25043a4aaa Remove unnecessary files
Also document some plans for the new program structure in REWRITE.md
2020-04-19 19:49:43 +00:00
7ebeef5873 Clean up gitignore 2020-04-19 18:47:44 +00:00
9b658776ca Merge pull request #6 from I-Al-Istannen/master
Hack in support for TI exams
2020-03-01 23:09:32 +00:00
cf3553175f Add OS_Exams synchronizer 2020-02-27 14:51:29 +01:00
bf8b3cf9f7 Hack in support for TI exams
This just adds an additional crawl check for AlteKlausuren. This is not
present on the root site but at the suffix `/Klausuren`.
Example config:

```py
 # The "Klausur" needs to be copied verbatim!
ti.synchronize("Klausur", "sync dir name",
               transform=ro_19_klausur_transform, filter=ro_19_klausur_filter)
```
2020-02-24 20:58:27 +01:00
4a5600d5ce Merge pull request #5 from I-Al-Istannen/master
Lose 50 minutes of my life (and fix the TGI tut)
2019-12-12 11:52:05 +00:00
f5bc49160f Lose 50 minutes of my life (and fix the TGI tut) 2019-12-12 12:50:16 +01:00
fa5f82d312 Merge pull request #4 from I-Al-Istannen/master
[TGI] Add TGi tut
2019-11-18 22:12:32 +00:00
4433696509 [TGI] Add TGi tut 2019-11-18 09:58:16 +01:00
1f5475abc5 Merge pull request #3 from I-Al-Istannen/master
Download all TGI files and not just lectures
2019-10-17 21:10:21 +00:00
1407c6d264 Download all TGI files and not just lectures 2019-10-17 22:14:32 +02:00
e152bfc4a3 Merge pull request #2 from I-Al-Istannen/master
Add support for TGI
2019-10-15 20:01:10 +00:00
1973c931bd Add support for other years in TGI downloader 2019-10-15 15:37:52 +02:00
458cc1c6d6 Add support for TGI website 2019-10-15 15:34:59 +02:00
52852d11a6 Bump version to 1.1.8 2019-09-22 11:56:41 +00:00
f94629a7fa Fix exceptions with weird content types
(hopefully)
2019-09-22 11:55:47 +00:00
c8ee456d33 Bump version to 1.1.7 2019-07-26 08:14:55 +00:00
2752e98621 Fix relative url joining in ti downloader 2019-07-26 10:06:01 +02:00
1572e11da8 Bump version to 1.1.6 2019-07-05 08:49:26 +00:00
ea01dc7cb2 Allow even more types of files 2019-07-05 08:48:43 +00:00
aba8d46d26 Bump version to 1.1.5 2019-07-04 12:17:33 +00:00
77056e6f8d Allow more types of files 2019-07-04 12:16:42 +00:00
064f12c14c Ignore mypy files 2019-07-04 12:16:26 +00:00
2eb834afc3 Bump version to 1.1.4 2019-06-11 12:46:40 +00:00
d468a45662 Allow wolfram files 2019-06-11 12:42:55 +00:00
32 changed files with 2922 additions and 1133 deletions

18
.gitignore vendored
View File

@ -1,12 +1,10 @@
# python stuff
__pycache__/
# venv stuff
bin/
include/
lib/
lib64
pyvenv.cfg
.venv/
venv/
.idea/
build/
.mypy_cache/
.tmp/
pip-selfcheck.json
.env
.vscode
ilias_cookies.txt

18
LICENSE Normal file
View File

@ -0,0 +1,18 @@
Copyright 2019-2020 Garmelon, I-Al-Istannen, danstooamerican, pavelzw
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -1,37 +1,8 @@
import logging
# pylint: disable=invalid-name
from .ffm import *
from .ilias import *
from .norbert import *
from .ti import *
from .utils import *
"""
This module exports only what you need for a basic configuration. If you want a
more complex configuration, you need to import the other submodules manually.
"""
__all__ = ["STYLE", "FORMAT", "DATE_FORMAT", "FORMATTER", "enable_logging"]
__all__ += ffm.__all__
__all__ += ilias.__all__
__all__ += norbert.__all__
__all__ += ti.__all__
__all__ += utils.__all__
STYLE = "{"
FORMAT = "[{levelname:<7}] {message}"
DATE_FORMAT = "%F %T"
FORMATTER = logging.Formatter(
fmt=FORMAT,
datefmt=DATE_FORMAT,
style=STYLE,
)
def enable_logging(name="PFERD", level=logging.INFO):
handler = logging.StreamHandler()
handler.setFormatter(FORMATTER)
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(handler)
# This should be logged by our own handler, and not the root logger's
# default handler, so we don't pass it on to the root logger.
logger.propagate = False
from .pferd import Pferd

125
PFERD/authenticators.py Normal file
View File

@ -0,0 +1,125 @@
"""
General authenticators useful in many situations
"""
import getpass
from typing import Optional, Tuple
class TfaAuthenticator:
# pylint: disable=too-few-public-methods
"""
An authenticator for a TFA token. Always prompts the user, as the token can not be cached.
"""
def __init__(self, reason: str):
"""
Create a new tfa authenticator.
Arguments:
reason {str} -- the reason for obtaining the credentials
"""
self._reason = reason
def get_token(self) -> str:
# pylint: disable=no-self-use
"""
Prompts the user for the token and returns it.
"""
print(f"Enter credentials ({self._reason})")
return getpass.getpass("TFA Token: ")
class UserPassAuthenticator:
"""
An authenticator for username-password combinations that prompts the user
for missing information.
"""
def __init__(
self,
reason: str,
username: Optional[str] = None,
password: Optional[str] = None,
) -> None:
"""
reason - what the credentials are used for
username - the username (if already known)
password - the password (if already known)
"""
self._reason = reason
self._given_username = username
self._given_password = password
self._username = username
self._password = password
def get_credentials(self) -> Tuple[str, str]:
"""
Returns a tuple (username, password). Prompts user for username or
password when necessary.
"""
if self._username is None and self._given_username is not None:
self._username = self._given_username
if self._password is None and self._given_password is not None:
self._password = self._given_password
if self._username is None or self._password is None:
print(f"Enter credentials ({self._reason})")
username: str
if self._username is None:
username = input("Username: ")
self._username = username
else:
username = self._username
password: str
if self._password is None:
password = getpass.getpass(prompt="Password: ")
self._password = password
else:
password = self._password
return (username, password)
@property
def username(self) -> str:
"""
The username. Accessing this property may cause the authenticator to
prompt the user.
"""
(username, _) = self.get_credentials()
return username
@property
def password(self) -> str:
"""
The password. Accessing this property may cause the authenticator to
prompt the user.
"""
(_, password) = self.get_credentials()
return password
def invalidate_credentials(self) -> None:
"""
Marks the credentials as invalid. If only a username was supplied in
the constructor, assumes that the username is valid and only the
password is invalid. If only a password was supplied in the
constructor, assumes that the password is valid and only the username
is invalid. Otherwise, assumes that username and password are both
invalid.
"""
self._username = None
self._password = None
if self._given_username is not None and self._given_password is not None:
self._given_username = None
self._given_password = None

69
PFERD/cookie_jar.py Normal file
View File

@ -0,0 +1,69 @@
"""A helper for requests cookies."""
import logging
from http.cookiejar import LoadError, LWPCookieJar
from pathlib import Path
from typing import Optional
import requests
LOGGER = logging.getLogger(__name__)
class CookieJar:
"""A cookie jar that can be persisted."""
def __init__(self, cookie_file: Optional[Path] = None) -> None:
"""Create a new cookie jar at the given path.
If the path is None, the cookies will not be persisted.
"""
self._cookies: LWPCookieJar
if cookie_file is None:
self._cookies = LWPCookieJar()
else:
self._cookies = LWPCookieJar(cookie_file)
@property
def cookies(self) -> LWPCookieJar:
"""Return the requests cookie jar."""
return self._cookies
def load_cookies(self) -> None:
"""Load all cookies from the file given in the constructor."""
if self._cookies.filename is None:
return
try:
LOGGER.info("Loading old cookies from %s", self._cookies.filename)
self._cookies.load(ignore_discard=True)
except (FileNotFoundError, LoadError):
LOGGER.warning(
"No valid cookie file found at %s, continuing with no cookies",
self._cookies.filename
)
def save_cookies(self, reason: Optional[str] = None) -> None:
"""Save the cookies in the file given in the constructor."""
if self._cookies.filename is None:
return
if reason is None:
LOGGER.info("Saving cookies")
else:
LOGGER.info("Saving cookies (%s)", reason)
# TODO figure out why ignore_discard is set
# TODO possibly catch a few more exceptions
self._cookies.save(ignore_discard=True)
def create_session(self) -> requests.Session:
"""Create a new session using the cookie jar."""
sess = requests.Session()
# From the request docs: "All requests code should work out of the box
# with externally provided instances of CookieJar, e.g. LWPCookieJar
# and FileCookieJar."
sess.cookies = self.cookies # type: ignore
return sess

169
PFERD/diva.py Normal file
View File

@ -0,0 +1,169 @@
"""
Utility functions and a scraper/downloader for the KIT DIVA portal.
"""
import logging
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, List, Optional
import requests
from .errors import FatalException
from .logging import PrettyLogger
from .organizer import Organizer
from .tmp_dir import TmpDir
from .transform import Transformable
from .utils import stream_to_path
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
@dataclass
class DivaDownloadInfo(Transformable):
"""
Information about a DIVA video
"""
url: str
DivaDownloadStrategy = Callable[[Organizer, DivaDownloadInfo], bool]
def diva_download_new(organizer: Organizer, info: DivaDownloadInfo) -> bool:
"""
Accepts only new files.
"""
resolved_file = organizer.resolve(info.path)
if not resolved_file.exists():
return True
PRETTY.ignored_file(info.path, "local file exists")
return False
class DivaPlaylistCrawler:
# pylint: disable=too-few-public-methods
"""
A crawler for DIVA playlists.
"""
_PLAYLIST_BASE_URL = "https://mediaservice.bibliothek.kit.edu/asset/detail/"
_COLLECTION_BASE_URL = "https://mediaservice.bibliothek.kit.edu/asset/collection.json"
def __init__(self, playlist_id: str):
self._id = playlist_id
@classmethod
def fetch_id(cls, playlist_link: str) -> str:
"""
Fetches the ID for a playerlist, given the base link
(e.g. https://mediaservice.bibliothek.kit.edu/#/details/DIVA-2019-271).
Raises a FatalException, if the id can not be resolved
"""
match = re.match(r".+#/details/(.+)", playlist_link)
if match is None:
raise FatalException(
"DIVA: Invalid playlist link format, could not extract details."
)
base_name = match.group(1)
response = requests.get(cls._PLAYLIST_BASE_URL + base_name + ".json")
if response.status_code != 200:
raise FatalException(
f"DIVA: Got non-200 status code ({response.status_code}))"
f"when requesting {response.url!r}!"
)
body = response.json()
if body["error"]:
raise FatalException(f"DIVA: Server returned error {body['error']!r}.")
return body["result"]["collection"]["id"]
def crawl(self) -> List[DivaDownloadInfo]:
"""
Crawls the playlist given in the constructor.
"""
response = requests.get(self._COLLECTION_BASE_URL, params={"collection": self._id})
if response.status_code != 200:
raise FatalException(f"Server returned status {response.status_code}.")
body = response.json()
if body["error"]:
raise FatalException(f"Server returned error {body['error']!r}.")
result = body["result"]
if result["resultCount"] > result["pageSize"]:
PRETTY.warning("Did not receive all results, some will be missing")
download_infos: List[DivaDownloadInfo] = []
for video in result["resultList"]:
title = video["title"]
collection_title = self._follow_path(["collection", "title"], video)
url = self._follow_path(
["resourceList", "derivateList", "mp4", "url"],
video
)
if url and collection_title and title:
path = Path(collection_title, title + ".mp4")
download_infos.append(DivaDownloadInfo(path, url))
else:
PRETTY.warning(f"Incomplete video found: {title!r} {collection_title!r} {url!r}")
return download_infos
@staticmethod
def _follow_path(path: List[str], obj: Any) -> Optional[Any]:
"""
Follows a property path through an object, bailing at the first None.
"""
current = obj
for path_step in path:
if path_step in current:
current = current[path_step]
else:
return None
return current
class DivaDownloader:
"""
A downloader for DIVA videos.
"""
def __init__(self, tmp_dir: TmpDir, organizer: Organizer, strategy: DivaDownloadStrategy):
self._tmp_dir = tmp_dir
self._organizer = organizer
self._strategy = strategy
self._session = requests.session()
def download_all(self, infos: List[DivaDownloadInfo]) -> None:
"""
Download multiple files one after the other.
"""
for info in infos:
self.download(info)
def download(self, info: DivaDownloadInfo) -> None:
"""
Download a single file.
"""
if not self._strategy(self._organizer, info):
self._organizer.mark(info.path)
return
with self._session.get(info.url, stream=True) as response:
if response.status_code == 200:
tmp_file = self._tmp_dir.new_path()
stream_to_path(response, tmp_file, info.path.name)
self._organizer.accept_file(tmp_file, info.path)
else:
PRETTY.warning(f"Could not download file, got response {response.status_code}")

69
PFERD/download_summary.py Normal file
View File

@ -0,0 +1,69 @@
"""
Provides a summary that keeps track of new modified or deleted files.
"""
from pathlib import Path
from typing import List
class DownloadSummary:
"""
Keeps track of all new, modified or deleted files and provides a summary.
"""
def __init__(self) -> None:
self._new_files: List[Path] = []
self._modified_files: List[Path] = []
self._deleted_files: List[Path] = []
@property
def new_files(self) -> List[Path]:
"""
Returns all new files.
"""
return self._new_files.copy()
@property
def modified_files(self) -> List[Path]:
"""
Returns all modified files.
"""
return self._modified_files.copy()
@property
def deleted_files(self) -> List[Path]:
"""
Returns all deleted files.
"""
return self._deleted_files.copy()
def merge(self, summary: 'DownloadSummary') -> None:
"""
Merges ourselves with the passed summary. Modifies this object, but not the passed one.
"""
self._new_files += summary.new_files
self._modified_files += summary.modified_files
self._deleted_files += summary.deleted_files
def add_deleted_file(self, path: Path) -> None:
"""
Registers a file as deleted.
"""
self._deleted_files.append(path)
def add_modified_file(self, path: Path) -> None:
"""
Registers a file as changed.
"""
self._modified_files.append(path)
def add_new_file(self, path: Path) -> None:
"""
Registers a file as new.
"""
self._new_files.append(path)
def has_updates(self) -> bool:
"""
Returns whether this summary has any updates.
"""
return bool(self._new_files or self._modified_files or self._deleted_files)

72
PFERD/downloaders.py Normal file
View File

@ -0,0 +1,72 @@
"""
General downloaders useful in many situations
"""
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import requests
import requests.auth
from .organizer import Organizer
from .tmp_dir import TmpDir
from .transform import Transformable
from .utils import stream_to_path
@dataclass
class HttpDownloadInfo(Transformable):
"""
This class describes a single file to be downloaded.
"""
url: str
parameters: Dict[str, Any] = field(default_factory=dict)
class HttpDownloader:
"""A HTTP downloader that can handle HTTP basic auth."""
def __init__(
self,
tmp_dir: TmpDir,
organizer: Organizer,
username: Optional[str],
password: Optional[str],
):
"""Create a new http downloader."""
self._organizer = organizer
self._tmp_dir = tmp_dir
self._username = username
self._password = password
self._session = self._build_session()
def _build_session(self) -> requests.Session:
session = requests.Session()
if self._username and self._password:
session.auth = requests.auth.HTTPBasicAuth(
self._username, self._password
)
return session
def download_all(self, infos: List[HttpDownloadInfo]) -> None:
"""
Download multiple files one after the other.
"""
for info in infos:
self.download(info)
def download(self, info: HttpDownloadInfo) -> None:
"""
Download a single file.
"""
with self._session.get(info.url, params=info.parameters, stream=True) as response:
if response.status_code == 200:
tmp_file = self._tmp_dir.new_path()
stream_to_path(response, tmp_file, info.path.name)
self._organizer.accept_file(tmp_file, info.path)
else:
# TODO use proper exception
raise Exception(f"Could not download file, got response {response.status_code}")

39
PFERD/errors.py Normal file
View File

@ -0,0 +1,39 @@
"""
An error logging decorator.
"""
import logging
from typing import Any, Callable, TypeVar, cast
from rich.console import Console
from .logging import PrettyLogger
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class FatalException(Exception):
"""
A fatal exception occurred. Recovery is not possible.
"""
TFun = TypeVar('TFun', bound=Callable[..., Any])
def swallow_and_print_errors(function: TFun) -> TFun:
"""
Decorates a function, swallows all errors, logs them and returns none if one occurred.
"""
def inner(*args: Any, **kwargs: Any) -> Any:
# pylint: disable=broad-except
try:
return function(*args, **kwargs)
except FatalException as error:
PRETTY.error(str(error))
return None
except Exception as error:
Console().print_exception()
return None
return cast(TFun, inner)

View File

@ -1,61 +0,0 @@
# Fakultät für Mathematik (FfM)
import logging
import pathlib
import re
import bs4
import requests
from .organizer import Organizer
from .utils import stream_to_path, PrettyLogger
__all__ = ["FfM"]
logger = logging.getLogger(__name__)
pretty = PrettyLogger(logger)
class FfM:
BASE_URL = "http://www.math.kit.edu/"
LINK_RE = re.compile(r"^https?://www.math.kit.edu/.*/(.*\.pdf)$")
def __init__(self, base_path):
self.base_path = base_path
self._session = requests.Session()
def synchronize(self, urlpart, to_dir, transform=lambda x: x):
pretty.starting_synchronizer(to_dir, "FfM", urlpart)
sync_path = pathlib.Path(self.base_path, to_dir)
orga = Organizer(self.base_path, sync_path)
orga.clean_temp_dir()
self._crawl(orga, urlpart, transform)
orga.clean_sync_dir()
orga.clean_temp_dir()
def _crawl(self, orga, urlpart, transform):
url = self.BASE_URL + urlpart
r = self._session.get(url)
soup = bs4.BeautifulSoup(r.text, "html.parser")
for found in soup.find_all("a", href=self.LINK_RE):
url = found["href"]
filename = re.match(self.LINK_RE, url).group(1).replace("/", ".")
logger.debug(f"Found file {filename} at {url}")
old_path = pathlib.PurePath(filename)
new_path = transform(old_path)
if new_path is None:
continue
logger.debug(f"Transformed from {old_path} to {new_path}")
temp_path = orga.temp_file()
self._download(url, temp_path)
orga.add_file(temp_path, new_path)
def _download(self, url, to_path):
with self._session.get(url, stream=True) as r:
stream_to_path(r, to_path)

View File

@ -1,109 +0,0 @@
# ILIAS
import logging
import pathlib
import re
from .ilias_authenticators import ShibbolethAuthenticator
from .organizer import Organizer
from .utils import PrettyLogger
__all__ = ["Ilias"]
logger = logging.getLogger(__name__)
pretty = PrettyLogger(logger)
class Ilias:
FILE_RE = re.compile(r"goto\.php\?target=(file_\d+_download)")
DIR_RE = re.compile(r"ilias\.php\?ref_id=(\d+)")
def __init__(self, base_path, cookie_file):
self.base_path = base_path
self._auth = ShibbolethAuthenticator(base_path / cookie_file)
def synchronize(self, ref_id, to_dir, transform=lambda x: x, filter=lambda x: True):
pretty.starting_synchronizer(to_dir, "ILIAS", f"ref_id {ref_id}")
sync_path = pathlib.Path(self.base_path, to_dir)
orga = Organizer(self.base_path, sync_path)
orga.clean_temp_dir()
files = self._crawl(pathlib.PurePath(), f"fold_{ref_id}", filter)
self._download(orga, files, transform)
orga.clean_sync_dir()
orga.clean_temp_dir()
def _crawl(self, dir_path, dir_id, filter_):
soup = self._auth.get_webpage(dir_id)
found_files = []
files = self._find_files(soup)
for (name, file_id) in files:
path = dir_path / name
found_files.append((path, file_id))
logger.debug(f"Found file {path}")
dirs = self._find_dirs(soup)
for (name, ref_id) in dirs:
path = dir_path / name
logger.debug(f"Found dir {path}")
if filter_(path):
logger.info(f"Searching {path}")
files = self._crawl(path, ref_id, filter_)
found_files.extend(files)
else:
logger.info(f"Not searching {path}")
return found_files
def _download(self, orga, files, transform):
for (path, file_id) in sorted(files):
to_path = transform(path)
if to_path is not None:
temp_path = orga.temp_file()
self._auth.download_file(file_id, temp_path)
orga.add_file(temp_path, to_path)
def _find_files(self, soup):
files = []
file_names = set()
found = soup.find_all("a", {"class": "il_ContainerItemTitle", "href": self.FILE_RE})
for element in found:
file_stem = element.string.strip().replace("/", ".")
file_type = element.parent.parent.parent.find("div", {"class": "il_ItemProperties"}).find("span").string.strip()
file_id = re.search(self.FILE_RE, element.get("href")).group(1)
file_name = f"{file_stem}.{file_type}"
if file_name in file_names:
counter = 1
while True:
file_name = f"{file_stem} (duplicate {counter}).{file_type}"
if file_name in file_names:
counter += 1
else:
break
files.append((file_name, file_id))
file_names.add(file_name)
return files
def _find_dirs(self, soup):
dirs = []
found = soup.find_all("div", {"class": "alert", "role": "alert"})
if found:
return []
found = soup.find_all("a", {"class": "il_ContainerItemTitle", "href": self.DIR_RE})
for element in found:
dir_name = element.string.strip().replace("/", ".")
ref_id = re.search(self.DIR_RE, element.get("href")).group(1)
dir_id = f"fold_{ref_id}"
dirs.append((dir_name, dir_id))
return dirs

10
PFERD/ilias/__init__.py Normal file
View File

@ -0,0 +1,10 @@
"""
Synchronizing files from ILIAS instances (https://www.ilias.de/).
"""
from .authenticators import IliasAuthenticator, KitShibbolethAuthenticator
from .crawler import (IliasCrawler, IliasCrawlerEntry, IliasDirectoryFilter,
IliasElementType)
from .downloader import (IliasDownloader, IliasDownloadInfo,
IliasDownloadStrategy, download_everything,
download_modified_or_new)

View File

@ -0,0 +1,131 @@
"""
Authenticators that can obtain proper ILIAS session cookies.
"""
import abc
import logging
from typing import Optional
import bs4
import requests
from ..authenticators import TfaAuthenticator, UserPassAuthenticator
from ..utils import soupify
LOGGER = logging.getLogger(__name__)
class IliasAuthenticator(abc.ABC):
# pylint: disable=too-few-public-methods
"""
An authenticator that logs an existing requests session into an ILIAS
account.
"""
@abc.abstractmethod
def authenticate(self, sess: requests.Session) -> None:
"""
Log a requests session into this authenticator's ILIAS account.
"""
class KitShibbolethAuthenticator(IliasAuthenticator):
# pylint: disable=too-few-public-methods
"""
Authenticate via KIT's shibboleth system.
"""
def __init__(self, username: Optional[str] = None, password: Optional[str] = None) -> None:
self._auth = UserPassAuthenticator("KIT ILIAS Shibboleth", username, password)
self._tfa_auth = TfaAuthenticator("KIT ILIAS Shibboleth")
def authenticate(self, sess: requests.Session) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
LOGGER.debug("Begin authentication process with ILIAS")
url = "https://ilias.studium.kit.edu/Shibboleth.sso/Login"
data = {
"sendLogin": "1",
"idp_selection": "https://idp.scc.kit.edu/idp/shibboleth",
"target": "/shib_login.php",
"home_organization_selection": "Mit KIT-Account anmelden",
}
soup = soupify(sess.post(url, data=data))
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"class": "form2", "method": "post"})
action = form["action"]
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
LOGGER.debug("Attempt to log in to Shibboleth using credentials")
url = "https://idp.scc.kit.edu" + action
data = {
"_eventId_proceed": "",
"j_username": self._auth.username,
"j_password": self._auth.password,
}
soup = soupify(sess.post(url, data=data))
if self._tfa_required(soup):
soup = self._authenticate_tfa(sess, soup)
if not self._login_successful(soup):
print("Incorrect credentials.")
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
LOGGER.debug("Redirect back to ILIAS with login information")
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = "https://ilias.studium.kit.edu/Shibboleth.sso/SAML2/POST"
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
sess.post(url, data=data)
def _authenticate_tfa(
self,
session: requests.Session,
soup: bs4.BeautifulSoup
) -> bs4.BeautifulSoup:
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
LOGGER.debug("Attempt to log in to Shibboleth with TFA token")
url = "https://idp.scc.kit.edu" + action
data = {
"_eventId_proceed": "",
"j_tokenNumber": self._tfa_auth.get_token()
}
return soupify(session.post(url, data=data))
@staticmethod
def _login_successful(soup: bs4.BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: bs4.BeautifulSoup) -> bool:
return soup.find(id="j_tokenNumber") is not None

598
PFERD/ilias/crawler.py Normal file
View File

@ -0,0 +1,598 @@
"""
Contains an ILIAS crawler alongside helper functions.
"""
import datetime
import json
import logging
import re
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import (parse_qs, urlencode, urljoin, urlparse, urlsplit,
urlunsplit)
import bs4
import requests
from ..errors import FatalException
from ..logging import PrettyLogger
from ..utils import soupify
from .authenticators import IliasAuthenticator
from .date_demangler import demangle_date
from .downloader import IliasDownloadInfo
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class IliasElementType(Enum):
"""
The type of an ilias element.
"""
REGULAR_FOLDER = "REGULAR_FOLDER"
VIDEO_FOLDER = "VIDEO_FOLDER"
EXERCISE_FOLDER = "EXERCISE_FOLDER"
REGULAR_FILE = "REGULAR_FILE"
VIDEO_FILE = "VIDEO_FILE"
FORUM = "FORUM"
EXTERNAL_LINK = "EXTERNAL_LINK"
def is_folder(self) -> bool:
"""
Returns whether this type is some kind of folder.
"""
return "FOLDER" in str(self.name)
IliasDirectoryFilter = Callable[[Path, IliasElementType], bool]
class IliasCrawlerEntry:
# pylint: disable=too-few-public-methods
"""
An ILIAS crawler entry used internally to find, catalogue and recursively crawl elements.
"""
def __init__(
self,
path: Path,
url: Union[str, Callable[[], Optional[str]]],
entry_type: IliasElementType,
modification_date: Optional[datetime.datetime]
):
self.path = path
if isinstance(url, str):
str_url = url
self.url: Callable[[], Optional[str]] = lambda: str_url
else:
self.url = url
self.entry_type = entry_type
self.modification_date = modification_date
def to_download_info(self) -> Optional[IliasDownloadInfo]:
"""
Converts this crawler entry to an IliasDownloadInfo, if possible.
This method will only succeed for *File* types.
"""
if self.entry_type in [IliasElementType.REGULAR_FILE, IliasElementType.VIDEO_FILE]:
return IliasDownloadInfo(self.path, self.url, self.modification_date)
return None
class IliasCrawler:
# pylint: disable=too-few-public-methods
"""
A crawler for ILIAS.
"""
# pylint: disable=too-many-arguments
def __init__(
self,
base_url: str,
session: requests.Session,
authenticator: IliasAuthenticator,
dir_filter: IliasDirectoryFilter
):
"""
Create a new ILIAS crawler.
"""
self._base_url = base_url
self._session = session
self._authenticator = authenticator
self.dir_filter = dir_filter
@staticmethod
def _url_set_query_param(url: str, param: str, value: str) -> str:
"""
Set a query parameter in an url, overwriting existing ones with the same name.
"""
scheme, netloc, path, query, fragment = urlsplit(url)
query_parameters = parse_qs(query)
query_parameters[param] = [value]
new_query_string = urlencode(query_parameters, doseq=True)
return urlunsplit((scheme, netloc, path, new_query_string, fragment))
def crawl_course(self, course_id: str) -> List[IliasDownloadInfo]:
"""
Starts the crawl process for a course, yielding a list of elements to (potentially)
download.
Arguments:
course_id {str} -- the course id
Raises:
FatalException: if an unrecoverable error occurs or the course id is not valid
"""
# Start crawling at the given course
root_url = self._url_set_query_param(
self._base_url + "/goto.php", "target", f"crs_{course_id}"
)
if not self._is_course_id_valid(root_url, course_id):
raise FatalException(
"Invalid course id? The URL the server returned did not contain my id."
)
# And treat it as a folder
entries: List[IliasCrawlerEntry] = self._crawl_folder(Path(""), root_url)
return self._iterate_entries_to_download_infos(entries)
def _is_course_id_valid(self, root_url: str, course_id: str) -> bool:
response: requests.Response = self._session.get(root_url)
return course_id in response.url
def crawl_personal_desktop(self) -> List[IliasDownloadInfo]:
"""
Crawls the ILIAS personal desktop (and every subelements that can be reached from there).
Raises:
FatalException: if an unrecoverable error occurs
"""
entries: List[IliasCrawlerEntry] = self._crawl_folder(
Path(""), self._base_url + "?baseClass=ilPersonalDesktopGUI"
)
return self._iterate_entries_to_download_infos(entries)
def _iterate_entries_to_download_infos(
self,
entries: List[IliasCrawlerEntry]
) -> List[IliasDownloadInfo]:
result: List[IliasDownloadInfo] = []
entries_to_process: List[IliasCrawlerEntry] = entries.copy()
while len(entries_to_process) > 0:
entry = entries_to_process.pop()
if entry.entry_type == IliasElementType.EXTERNAL_LINK:
PRETTY.not_searching(entry.path, "external link")
continue
if entry.entry_type == IliasElementType.FORUM:
PRETTY.not_searching(entry.path, "forum")
continue
if entry.entry_type.is_folder() and not self.dir_filter(entry.path, entry.entry_type):
PRETTY.not_searching(entry.path, "user filter")
continue
download_info = entry.to_download_info()
if download_info is not None:
result.append(download_info)
continue
url = entry.url()
if url is None:
PRETTY.warning(f"Could not find url for {str(entry.path)!r}, skipping it")
continue
PRETTY.searching(entry.path)
if entry.entry_type == IliasElementType.EXERCISE_FOLDER:
entries_to_process += self._crawl_exercises(entry.path, url)
continue
if entry.entry_type == IliasElementType.REGULAR_FOLDER:
entries_to_process += self._crawl_folder(entry.path, url)
continue
if entry.entry_type == IliasElementType.VIDEO_FOLDER:
entries_to_process += self._crawl_video_directory(entry.path, url)
continue
return result
def _crawl_folder(self, folder_path: Path, url: str) -> List[IliasCrawlerEntry]:
"""
Crawl all files in a folder-like element.
"""
soup = self._get_page(url, {})
result: List[IliasCrawlerEntry] = []
# Fetch all links and throw them to the general interpreter
links: List[bs4.Tag] = soup.select("a.il_ContainerItemTitle")
for link in links:
abs_url = self._abs_url_from_link(link)
element_path = Path(folder_path, link.getText().strip())
element_type = self._find_type_from_link(element_path, link, abs_url)
if element_type == IliasElementType.REGULAR_FILE:
result += self._crawl_file(folder_path, link, abs_url)
elif element_type is not None:
result += [IliasCrawlerEntry(element_path, abs_url, element_type, None)]
else:
PRETTY.warning(f"Found element without a type at {str(element_path)!r}")
return result
def _abs_url_from_link(self, link_tag: bs4.Tag) -> str:
"""
Create an absolute url from an <a> tag.
"""
return urljoin(self._base_url, link_tag.get("href"))
@staticmethod
def _find_type_from_link(
path: Path,
link_element: bs4.Tag,
url: str
) -> Optional[IliasElementType]:
"""
Decides which sub crawler to use for a given top level element.
"""
parsed_url = urlparse(url)
LOGGER.debug("Parsed url: %r", parsed_url)
# file URLs contain "target=file"
if "target=file_" in parsed_url.query:
return IliasElementType.REGULAR_FILE
# Skip forums
if "cmd=showThreads" in parsed_url.query:
return IliasElementType.FORUM
# Everything with a ref_id can *probably* be opened to reveal nested things
# video groups, directories, exercises, etc
if "ref_id=" in parsed_url.query:
return IliasCrawler._find_type_from_folder_like(link_element, url)
PRETTY.warning(
"Got unknown element type in switch. I am not sure what horror I found on the"
f" ILIAS page. The element was at {str(path)!r} and it is {link_element!r})"
)
return None
@staticmethod
def _find_type_from_folder_like(link_element: bs4.Tag, url: str) -> Optional[IliasElementType]:
"""
Try crawling something that looks like a folder.
"""
# pylint: disable=too-many-return-statements
# We look for the outer div of our inner link, to find information around it
# (mostly the icon)
for parent in link_element.parents:
if "ilContainerListItemOuter" in parent["class"]:
found_parent = parent
break
if found_parent is None:
PRETTY.warning(f"Could not find element icon for {url!r}")
return None
# Find the small descriptive icon to figure out the type
img_tag: Optional[bs4.Tag] = found_parent.select_one("img.ilListItemIcon")
if img_tag is None:
PRETTY.warning(f"Could not find image tag for {url!r}")
return None
if "opencast" in str(img_tag["alt"]).lower():
return IliasElementType.VIDEO_FOLDER
if str(img_tag["src"]).endswith("icon_exc.svg"):
return IliasElementType.EXERCISE_FOLDER
if str(img_tag["src"]).endswith("icon_webr.svg"):
return IliasElementType.EXTERNAL_LINK
if str(img_tag["src"]).endswith("frm.svg"):
return IliasElementType.FORUM
return IliasElementType.REGULAR_FOLDER
@staticmethod
def _crawl_file(path: Path, link_element: bs4.Tag, url: str) -> List[IliasCrawlerEntry]:
"""
Crawls a file.
"""
# Files have a list of properties (type, modification date, size, etc.)
# In a series of divs.
# Find the parent containing all those divs, so we can filter our what we need
properties_parent: bs4.Tag = link_element.findParent(
"div", {"class": lambda x: "il_ContainerListItem" in x}
).select_one(".il_ItemProperties")
# The first one is always the filetype
file_type = properties_parent.select_one("span.il_ItemProperty").getText().strip()
# The rest does not have a stable order. Grab the whole text and reg-ex the date
# out of it
all_properties_text = properties_parent.getText().strip()
modification_date_match = re.search(
r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)",
all_properties_text
)
if modification_date_match is None:
modification_date = None
PRETTY.warning(f"Could not extract start date from {all_properties_text!r}")
else:
modification_date_str = modification_date_match.group(1)
modification_date = demangle_date(modification_date_str)
# Grab the name from the link text
name = link_element.getText()
full_path = Path(path, name + "." + file_type)
return [
IliasCrawlerEntry(full_path, url, IliasElementType.REGULAR_FILE, modification_date)
]
def _crawl_video_directory(self, video_dir_path: Path, url: str) -> List[IliasCrawlerEntry]:
"""
Crawl the video overview site.
"""
initial_soup = self._get_page(url, {})
# The page is actually emtpy but contains a much needed token in the link below.
# That token can be used to fetch the *actual* video listing
content_link: bs4.Tag = initial_soup.select_one("#tab_series a")
# Fetch the actual video listing. The given parameters return all videos (max 800)
# in a standalone html page
video_list_soup = self._get_page(
self._abs_url_from_link(content_link),
{"limit": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
)
# If we find a page selected, we probably need to respect pagination
if self._is_paginated_video_page(video_list_soup):
second_stage_url = self._abs_url_from_link(content_link)
return self._crawl_paginated_video_directory(
video_dir_path, video_list_soup, second_stage_url
)
return self._crawl_video_directory_second_stage(video_dir_path, video_list_soup)
@staticmethod
def _is_paginated_video_page(soup: bs4.BeautifulSoup) -> bool:
return soup.find(id=re.compile(r"tab_page_sel.+")) is not None
def _crawl_paginated_video_directory(
self,
video_dir_path: Path,
paged_video_list_soup: bs4.BeautifulSoup,
second_stage_url: str
) -> List[IliasCrawlerEntry]:
LOGGER.info("Found paginated video page, trying 800 elements")
# Try to find the table id. This can be used to build the query parameter indicating
# you want 800 elements
table_element: bs4.Tag = paged_video_list_soup.find(
name="table", id=re.compile(r"tbl_xoct_.+")
)
if table_element is None:
PRETTY.warning(
"Could not increase elements per page (table not found)."
" Some might not be crawled!"
)
return self._crawl_video_directory_second_stage(video_dir_path, paged_video_list_soup)
match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"])
if match is None:
PRETTY.warning(
"Could not increase elements per page (table id not found)."
" Some might not be crawled!"
)
return self._crawl_video_directory_second_stage(video_dir_path, paged_video_list_soup)
table_id = match.group(1)
extended_video_page = self._get_page(
second_stage_url,
{f"tbl_xoct_{table_id}_trows": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
)
if self._is_paginated_video_page(extended_video_page):
PRETTY.warning(
"800 elements do not seem to be enough (or I failed to fetch that many)."
" I will miss elements."
)
return self._crawl_video_directory_second_stage(video_dir_path, extended_video_page)
def _crawl_video_directory_second_stage(
self,
video_dir_path: Path,
video_list_soup: bs4.BeautifulSoup
) -> List[IliasCrawlerEntry]:
"""
Crawls the "second stage" video page. This page contains the actual video urls.
"""
direct_download_links: List[bs4.Tag] = video_list_soup.findAll(
name="a", text=re.compile(r"\s*Download\s*")
)
# Video start links are marked with an "Abspielen" link
video_links: List[bs4.Tag] = video_list_soup.findAll(
name="a", text=re.compile(r"\s*Abspielen\s*")
)
results: List[IliasCrawlerEntry] = []
# We can download everything directly!
# FIXME: Sadly the download button is currently broken, so never do that
if False and len(direct_download_links) == len(video_links):
for link in direct_download_links:
results += self._crawl_single_video(video_dir_path, link, True)
else:
for link in video_links:
results += self._crawl_single_video(video_dir_path, link, False)
return results
def _crawl_single_video(
self,
parent_path: Path,
link: bs4.Tag,
direct_download: bool
) -> List[IliasCrawlerEntry]:
"""
Crawl a single video based on its "Abspielen" link from the video listing.
"""
# The link is part of a table with multiple columns, describing metadata.
# 6th child (1 indexed) is the modification time string
modification_string = link.parent.parent.parent.select_one(
"td.std:nth-child(6)"
).getText().strip()
modification_time = datetime.datetime.strptime(modification_string, "%d.%m.%Y - %H:%M")
title = link.parent.parent.parent.select_one(
"td.std:nth-child(3)"
).getText().strip()
title += ".mp4"
video_path: Path = Path(parent_path, title)
video_url = self._abs_url_from_link(link)
# The video had a direct download button we can use instead
if direct_download:
LOGGER.debug("Using direct download for video %r", str(video_path))
return [IliasCrawlerEntry(
video_path, video_url, IliasElementType.VIDEO_FILE, modification_time
)]
return [IliasCrawlerEntry(
video_path,
self._crawl_video_url_from_play_link(video_url),
IliasElementType.VIDEO_FILE,
modification_time
)]
def _crawl_video_url_from_play_link(self, play_url: str) -> Callable[[], Optional[str]]:
def inner() -> Optional[str]:
# Fetch the actual video page. This is a small wrapper page initializing a javscript
# player. Sadly we can not execute that JS. The actual video stream url is nowhere
# on the page, but defined in a JS object inside a script tag, passed to the player
# library.
# We do the impossible and RegEx the stream JSON object out of the page's HTML source
video_page_soup = soupify(self._session.get(play_url))
regex: re.Pattern = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
)
json_match = regex.search(str(video_page_soup))
if json_match is None:
PRETTY.warning(f"Could not find json stream info for {play_url!r}")
return None
json_str = json_match.group(1)
# parse it
json_object = json.loads(json_str)
# and fetch the video url!
video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"]
return video_url
return inner
def _crawl_exercises(self, element_path: Path, url: str) -> List[IliasCrawlerEntry]:
"""
Crawl files offered for download in exercises.
"""
soup = self._get_page(url, {})
results: List[IliasCrawlerEntry] = []
# Each assignment is in an accordion container
assignment_containers: List[bs4.Tag] = soup.select(".il_VAccordionInnerContainer")
for container in assignment_containers:
# Fetch the container name out of the header to use it in the path
container_name = container.select_one(".ilAssignmentHeader").getText().strip()
# Find all download links in the container (this will contain all the files)
files: List[bs4.Tag] = container.findAll(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x},
text="Download"
)
LOGGER.debug("Found exercise container %r", container_name)
# Grab each file as you now have the link
for file_link in files:
# Two divs, side by side. Left is the name, right is the link ==> get left
# sibling
file_name = file_link.parent.findPrevious(name="div").getText().strip()
url = self._abs_url_from_link(file_link)
LOGGER.debug("Found file %r at %r", file_name, url)
results.append(IliasCrawlerEntry(
Path(element_path, container_name, file_name),
url,
IliasElementType.REGULAR_FILE,
None # We do not have any timestamp
))
return results
def _get_page(self, url: str, params: Dict[str, Any]) -> bs4.BeautifulSoup:
"""
Fetches a page from ILIAS, authenticating when needed.
"""
LOGGER.debug("Fetching %r", url)
response = self._session.get(url, params=params)
content_type = response.headers["content-type"]
if not content_type.startswith("text/html"):
raise FatalException(
f"Invalid content type {content_type} when crawling ilias page"
" {url!r} with {params!r}"
)
soup = soupify(response)
if self._is_logged_in(soup):
return soup
LOGGER.info("Not authenticated, changing that...")
self._authenticator.authenticate(self._session)
return self._get_page(url, params)
@staticmethod
def _is_logged_in(soup: bs4.BeautifulSoup) -> bool:
# Normal ILIAS pages
userlog = soup.find("li", {"id": "userlog"})
if userlog is not None:
LOGGER.debug("Auth: Found #userlog")
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
LOGGER.debug("Auth: Found #tbl_xoct.+")
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
LOGGER.debug("Auth: Found #playerContainer")
return True
return False

View File

@ -0,0 +1,51 @@
"""
Helper methods to demangle an ILIAS date.
"""
import datetime
import locale
import logging
import re
from typing import Optional
from ..logging import PrettyLogger
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
def demangle_date(date: str) -> Optional[datetime.datetime]:
"""
Demangle a given date in one of the following formats:
"Gestern, HH:MM"
"Heute, HH:MM"
"Morgen, HH:MM"
"dd. mon.yyyy, HH:MM
"""
saved = locale.setlocale(locale.LC_ALL)
try:
try:
locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8')
except locale.Error:
PRETTY.warning(
"Could not set language to german. Assuming you use english everywhere."
)
date = re.sub(r"\s+", " ", date)
date = re.sub("Gestern|Yesterday", _yesterday().strftime("%d. %b %Y"), date, re.I)
date = re.sub("Heute|Today", datetime.date.today().strftime("%d. %b %Y"), date, re.I)
date = re.sub("Morgen|Tomorrow", _tomorrow().strftime("%d. %b %Y"), date, re.I)
return datetime.datetime.strptime(date, "%d. %b %Y, %H:%M")
except ValueError:
PRETTY.warning(f"Could not parse date {date!r}")
return None
finally:
locale.setlocale(locale.LC_ALL, saved)
def _yesterday() -> datetime.date:
return datetime.date.today() - datetime.timedelta(days=1)
def _tomorrow() -> datetime.date:
return datetime.date.today() + datetime.timedelta(days=1)

157
PFERD/ilias/downloader.py Normal file
View File

@ -0,0 +1,157 @@
"""Contains a downloader for ILIAS."""
import datetime
import logging
import math
import os
from pathlib import Path, PurePath
from typing import Callable, List, Optional, Union
import bs4
import requests
from ..logging import PrettyLogger
from ..organizer import Organizer
from ..tmp_dir import TmpDir
from ..transform import Transformable
from ..utils import soupify, stream_to_path
from .authenticators import IliasAuthenticator
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class ContentTypeException(Exception):
"""Thrown when the content type of the ilias element can not be handled."""
class IliasDownloadInfo(Transformable):
"""
This class describes a single file to be downloaded.
"""
def __init__(
self,
path: PurePath,
url: Union[str, Callable[[], Optional[str]]],
modifcation_date: Optional[datetime.datetime]
):
super().__init__(path)
if isinstance(url, str):
string_url = url
self.url: Callable[[], Optional[str]] = lambda: string_url
else:
self.url = url
self.modification_date = modifcation_date
IliasDownloadStrategy = Callable[[Organizer, IliasDownloadInfo], bool]
def download_everything(organizer: Organizer, info: IliasDownloadInfo) -> bool:
# pylint: disable=unused-argument
"""
Accepts everything.
"""
return True
def download_modified_or_new(organizer: Organizer, info: IliasDownloadInfo) -> bool:
"""
Accepts new files or files with a more recent modification date.
"""
resolved_file = organizer.resolve(info.path)
if not resolved_file.exists() or info.modification_date is None:
return True
resolved_mod_time_seconds = resolved_file.stat().st_mtime
# Download if the info is newer
if info.modification_date.timestamp() > resolved_mod_time_seconds:
return True
PRETTY.ignored_file(info.path, "local file has newer or equal modification time")
return False
class IliasDownloader:
# pylint: disable=too-many-arguments
"""A downloader for ILIAS."""
def __init__(
self,
tmp_dir: TmpDir,
organizer: Organizer,
session: requests.Session,
authenticator: IliasAuthenticator,
strategy: IliasDownloadStrategy,
):
"""
Create a new IliasDownloader.
"""
self._tmp_dir = tmp_dir
self._organizer = organizer
self._session = session
self._authenticator = authenticator
self._strategy = strategy
def download_all(self, infos: List[IliasDownloadInfo]) -> None:
"""
Download multiple files one after the other.
"""
for info in infos:
self.download(info)
def download(self, info: IliasDownloadInfo) -> None:
"""
Download a file from ILIAS.
Retries authentication until eternity if it could not fetch the file.
"""
LOGGER.debug("Downloading %r", info)
if not self._strategy(self._organizer, info):
self._organizer.mark(info.path)
return
tmp_file = self._tmp_dir.new_path()
while not self._try_download(info, tmp_file):
LOGGER.info("Retrying download: %r", info)
self._authenticator.authenticate(self._session)
dst_path = self._organizer.accept_file(tmp_file, info.path)
if dst_path and info.modification_date:
os.utime(
dst_path,
times=(
math.ceil(info.modification_date.timestamp()),
math.ceil(info.modification_date.timestamp())
)
)
def _try_download(self, info: IliasDownloadInfo, target: Path) -> bool:
url = info.url()
if url is None:
PRETTY.warning(f"Could not download {str(info.path)!r} as I got no URL :/")
return True
with self._session.get(url, stream=True) as response:
content_type = response.headers["content-type"]
has_content_disposition = "content-disposition" in response.headers
if content_type.startswith("text/html") and not has_content_disposition:
if self._is_logged_in(soupify(response)):
raise ContentTypeException("Attempting to download a web page, not a file")
return False
# Yay, we got the file :)
stream_to_path(response, target, info.path.name)
return True
@staticmethod
def _is_logged_in(soup: bs4.BeautifulSoup) -> bool:
userlog = soup.find("li", {"id": "userlog"})
return userlog is not None

View File

@ -1,176 +0,0 @@
# This file is called IliasAuthenticators because there are multiple mechanisms
# for authenticating with Ilias (even though only the Shibboleth is currently
# implemented). Most of what the ShibbolethAuthenticator currently does is
# not Shibboleth specific; this mess would have to be cleaned up before
# actually implementing any other authentication method.
#
# I think the only other method is the password prompt when clicking the log in
# button.
import getpass
import http.cookiejar
import logging
import time
import bs4
import requests
from .utils import ContentTypeException, stream_to_path
__all__ = ["ShibbolethAuthenticator"]
logger = logging.getLogger(__name__)
class ShibbolethAuthenticator:
ILIAS_GOTO = "https://ilias.studium.kit.edu/goto.php"
ALLOWED_CONTENT_TYPES = [
"application/pdf",
"application/zip",
"application/msword",
"text/xml",
"text/plain",
"image/jpeg",
"image/png",
]
def __init__(self, cookie_file) -> None:
# Because LWPCookieJar insists on the path being str-like instead of
# Path-like.
cookie_file = str(cookie_file)
cookies = http.cookiejar.LWPCookieJar(cookie_file)
try:
logger.info(f"Loading old cookies from {cookie_file!r}")
cookies.load(ignore_discard=True)
except (FileNotFoundError, http.cookiejar.LoadError):
logger.warn(f"No (valid) cookie file found at {cookie_file!r}, ignoring...")
self._session = requests.Session()
self._session.cookies = cookies
def _authenticate(self):
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
logger.debug("Begin authentication process with ILIAS")
url = "https://ilias.studium.kit.edu/Shibboleth.sso/Login"
data = {
"sendLogin": "1",
"idp_selection": "https://idp.scc.kit.edu/idp/shibboleth",
"target": "/shib_login.php",
"home_organization_selection": "Mit KIT-Account anmelden",
}
r = self._session.post(url, data=data)
soup = bs4.BeautifulSoup(r.text, "html.parser")
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"class": "form2", "method": "post"})
action = form["action"]
print("Please enter Shibboleth credentials.")
username = getpass.getpass(prompt="Username: ")
password = getpass.getpass(prompt="Password: ")
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
logger.debug("Attempt to log in to Shibboleth using credentials")
url = "https://idp.scc.kit.edu" + action
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
}
r = self._session.post(url, data=data)
soup = bs4.BeautifulSoup(r.text, "html.parser")
if not self._login_successful(soup):
print("Incorrect credentials.")
# Saving progress
logger.info("Saving cookies (successfully authenticated with Shibboleth)")
self._session.cookies.save(ignore_discard=True)
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
logger.debug("Redirect back to ILIAS with login information")
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = "https://ilias.studium.kit.edu/Shibboleth.sso/SAML2/POST"
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
self._session.post(url, data=data)
# Saving progress
logger.info("Saving cookies (successfully authenticated with ILIAS)")
self._session.cookies.save(ignore_discard=True)
def _login_successful(self, soup):
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
def _is_logged_in(self, soup):
userlog = soup.find("li", {"id": "userlog"})
return userlog is not None
def get_webpage(self, object_id):
params = {"target": object_id}
while True:
logger.debug(f"Getting {self.ILIAS_GOTO} {params}")
r = self._session.get(self.ILIAS_GOTO, params=params)
soup = bs4.BeautifulSoup(r.text, "html.parser")
if self._is_logged_in(soup):
return soup
else:
logger.info("Not logged in, authenticating...")
self._authenticate()
def get_webpage_by_refid(self, ref_id):
return self.get_webpage(f"fold_{ref_id}")
def _download(self, url, params, to_path):
with self._session.get(url, params=params, stream=True) as r:
content_type = r.headers["content-type"]
if content_type in self.ALLOWED_CONTENT_TYPES:
# Yay, we got the file :)
stream_to_path(r, to_path)
return True
elif content_type == "text/html":
# Dangit, we're probably not logged in.
soup = bs4.BeautifulSoup(r.text, "html.parser")
if self._is_logged_in(soup):
raise ContentTypeException(
"Attempting to download a web page, not a file")
return False
else:
# What *did* we get?
raise ContentTypeException(
f"Unknown file of type {content_type}")
def download_file(self, file_id, to_path):
params = {"target": file_id}
while True:
success = self._download(self.ILIAS_GOTO, params, to_path)
if success:
return
else:
logger.info("Not logged in, authenticating...")
self._authenticate()

41
PFERD/location.py Normal file
View File

@ -0,0 +1,41 @@
"""
Contains a Location class for objects with an inherent path.
"""
from pathlib import Path, PurePath
class ResolveException(Exception):
"""An exception while resolving a file."""
# TODO take care of this when doing exception handling
class Location:
"""
An object that has an inherent path.
"""
def __init__(self, path: Path):
self._path = path.resolve()
@property
def path(self) -> Path:
"""
This object's location.
"""
return self._path
def resolve(self, target: PurePath) -> Path:
"""
Resolve a file relative to the path of this location.
Raises a [ResolveException] if the file is outside the given directory.
"""
absolute_path = self.path.joinpath(target).resolve()
# TODO Make this less inefficient
if self.path not in absolute_path.parents:
raise ResolveException(f"Path {target} is not inside directory {self.path}")
return absolute_path

187
PFERD/logging.py Normal file
View File

@ -0,0 +1,187 @@
"""
Contains a few logger utility functions and implementations.
"""
import logging
from pathlib import Path
from typing import List, Optional
from rich import print as rich_print
from rich._log_render import LogRender
from rich.console import Console
from rich.panel import Panel
from rich.style import Style
from rich.text import Text
from rich.theme import Theme
from .download_summary import DownloadSummary
from .utils import PathLike, to_path
STYLE = "{"
FORMAT = "[{levelname:<7}] {message}"
DATE_FORMAT = "%F %T"
def enable_logging(name: str = "PFERD", level: int = logging.INFO) -> None:
"""
Enable and configure logging via the logging module.
"""
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(RichLoggingHandler(level=level))
# This should be logged by our own handler, and not the root logger's
# default handler, so we don't pass it on to the root logger.
logger.propagate = False
class RichLoggingHandler(logging.Handler):
"""
A logging handler that uses rich for highlighting
"""
def __init__(self, level: int) -> None:
super().__init__(level=level)
self.console = Console(theme=Theme({
"logging.level.warning": Style(color="yellow")
}))
self._log_render = LogRender(show_level=True, show_time=False, show_path=False)
def emit(self, record: logging.LogRecord) -> None:
"""
Invoked by logging.
"""
log_style = f"logging.level.{record.levelname.lower()}"
message = self.format(record)
level = Text()
level.append(record.levelname, log_style)
message_text = Text.from_markup(message)
self.console.print(
self._log_render(
self.console,
[message_text],
level=level,
)
)
class PrettyLogger:
"""
A logger that prints some specially formatted log messages in color.
"""
def __init__(self, logger: logging.Logger) -> None:
self.logger = logger
@staticmethod
def _format_path(path: PathLike) -> str:
return repr(str(to_path(path)))
def error(self, message: str) -> None:
"""
Print an error message indicating some operation fatally failed.
"""
self.logger.error(
f"[bold red]{message}[/bold red]"
)
def warning(self, message: str) -> None:
"""
Print a warning message indicating some operation failed, but the error can be recovered
or ignored.
"""
self.logger.warning(
f"[bold yellow]{message}[/bold yellow]"
)
def modified_file(self, path: PathLike) -> None:
"""
An existing file has changed.
"""
self.logger.info(
f"[bold magenta]Modified {self._format_path(path)}.[/bold magenta]"
)
def new_file(self, path: PathLike) -> None:
"""
A new file has been downloaded.
"""
self.logger.info(
f"[bold green]Created {self._format_path(path)}.[/bold green]"
)
def deleted_file(self, path: PathLike) -> None:
"""
A file has been deleted.
"""
self.logger.info(
f"[bold red]Deleted {self._format_path(path)}.[/bold red]"
)
def ignored_file(self, path: PathLike, reason: str) -> None:
"""
File was not downloaded or modified.
"""
self.logger.info(
f"[dim]Ignored {self._format_path(path)} "
f"([/dim]{reason}[dim]).[/dim]"
)
def searching(self, path: PathLike) -> None:
"""
A crawler searches a particular object.
"""
self.logger.info(f"Searching {self._format_path(path)}")
def not_searching(self, path: PathLike, reason: str) -> None:
"""
A crawler does not search a particular object.
"""
self.logger.info(
f"[dim]Not searching {self._format_path(path)} "
f"([/dim]{reason}[dim]).[/dim]"
)
def summary(self, download_summary: DownloadSummary) -> None:
"""
Prints a download summary.
"""
self.logger.info("")
self.logger.info("[bold cyan]Download Summary[/bold cyan]")
if not download_summary.has_updates():
self.logger.info("[bold dim]Nothing changed![/bold dim]")
return
for new_file in download_summary.new_files:
self.new_file(new_file)
for modified_file in download_summary.modified_files:
self.modified_file(modified_file)
for deleted_files in download_summary.deleted_files:
self.deleted_file(deleted_files)
def starting_synchronizer(
self,
target_directory: PathLike,
synchronizer_name: str,
subject: Optional[str] = None,
) -> None:
"""
A special message marking that a synchronizer has been started.
"""
subject_str = f"{subject} " if subject else ""
self.logger.info("")
self.logger.info((
f"[bold cyan]Synchronizing "
f"{subject_str}to {self._format_path(target_directory)} "
f"using the {synchronizer_name} synchronizer.[/bold cyan]"
))

View File

@ -1,108 +0,0 @@
# Norberts Prog-Tuts
import logging
import pathlib
import re
import zipfile
import bs4
import requests
from .organizer import Organizer
from .utils import rename, stream_to_path, PrettyLogger
__all__ = ["Norbert"]
logger = logging.getLogger(__name__)
pretty = PrettyLogger(logger)
class Norbert:
BASE_URL = "https://studwww.informatik.kit.edu/~s_blueml/"
LINK_RE = re.compile(r"^progtut/.*/(.*\.zip)$")
def __init__(self, base_path):
self.base_path = base_path
self._session = requests.Session()
def synchronize(self, to_dir, transform=lambda x: x, unzip=lambda _: True):
pretty.starting_synchronizer(to_dir, "Norbert")
sync_path = pathlib.Path(self.base_path, to_dir)
orga = Organizer(self.base_path, sync_path)
orga.clean_temp_dir()
files = self._crawl()
self._download(orga, files, transform, unzip)
orga.clean_sync_dir()
orga.clean_temp_dir()
def _crawl(self):
url = self.BASE_URL
r = self._session.get(url)
# replace undecodeable characters with a placeholder
#text = r.raw.decode("utf-8", "replace")
text = r.text
soup = bs4.BeautifulSoup(text, "html.parser")
files = []
for found in soup.find_all("a", href=self.LINK_RE):
url = found["href"]
full_url = self.BASE_URL + url
filename = re.search(self.LINK_RE, url).group(1)
path = pathlib.PurePath(filename)
logger.debug(f"Found zip file {filename} at {full_url}")
files.append((path, full_url))
return files
def _download(self, orga, files, transform, unzip):
for path, url in sorted(files):
# Yes, we want the zip file contents
if unzip(path):
logger.debug(f"Downloading and unzipping {path}")
zip_path = rename(path, path.stem)
# Download zip file
temp_file = orga.temp_file()
self._download_zip(url, temp_file)
# Search the zip file for files to extract
temp_dir = orga.temp_dir()
with zipfile.ZipFile(temp_file, "r") as zf:
for info in zf.infolist():
# Only interested in the files themselves, the directory
# structure is created automatically by orga.add_file()
if info.is_dir():
continue
file_path = zip_path / pathlib.PurePath(info.filename)
logger.debug(f"Found {info.filename} at path {file_path}")
new_path = transform(file_path)
if new_path is not None:
# Extract to temp file and add, the usual deal
temp_file = orga.temp_file()
extracted_path = zf.extract(info, temp_dir)
extracted_path = pathlib.Path(extracted_path)
orga.add_file(extracted_path, new_path)
# No, we only want the zip file itself
else:
logger.debug(f"Only downloading {path}")
new_path = transform(path)
if new_path is not None:
temp_file = orga.temp_file()
self._download_zip(url, temp_file)
orga.add_file(temp_file, new_path)
def _download_zip(self, url, to_path):
with self._session.get(url, stream=True) as r:
stream_to_path(r, to_path)

View File

@ -1,151 +1,137 @@
"""A simple helper for managing downloaded files.
A organizer is bound to a single directory.
"""
import filecmp
import logging
import pathlib
import shutil
from pathlib import Path, PurePath
from typing import List, Optional, Set
from . import utils
from .download_summary import DownloadSummary
from .location import Location
from .logging import PrettyLogger
from .utils import prompt_yes_no
__all__ = ["Organizer"]
logger = logging.getLogger(__name__)
pretty = utils.PrettyLogger(logger)
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class Organizer:
def __init__(self, base_dir, sync_dir):
class FileAcceptException(Exception):
"""An exception while accepting a file."""
class Organizer(Location):
"""A helper for managing downloaded files."""
def __init__(self, path: Path):
"""Create a new organizer for a given path."""
super().__init__(path)
self._known_files: Set[Path] = set()
# Keep the root dir
self._known_files.add(path.resolve())
self.download_summary = DownloadSummary()
def accept_file(self, src: Path, dst: PurePath) -> Optional[Path]:
"""
base_dir - the .tmp directory will be created here
sync_dir - synced files will be moved here
Both are expected to be concrete pathlib paths.
Move a file to this organizer and mark it.
Returns the path the file was moved to, to allow the caller to adjust the metadata.
As you might still need to adjust the metadata when the file was identical
(e.g. update the timestamp), the path is also returned in this case.
In all other cases (ignored, not overwritten, etc.) this method returns None.
"""
src_absolute = src.resolve()
dst_absolute = self.resolve(dst)
self._base_dir = base_dir
self._sync_dir = sync_dir
if not src_absolute.exists():
raise FileAcceptException("Source file does not exist")
self._temp_dir = pathlib.Path(self._base_dir, ".tmp")
self._temp_nr = 0
if not src_absolute.is_file():
raise FileAcceptException("Source is a directory")
# check if base/sync dir exist?
LOGGER.debug("Copying %s to %s", src_absolute, dst_absolute)
self._added_files = set()
if self._is_marked(dst):
PRETTY.warning(f"File {str(dst_absolute)!r} was already written!")
if not prompt_yes_no(f"Overwrite file?", default=False):
PRETTY.ignored_file(dst_absolute, "file was written previously")
return None
def clean_temp_dir(self):
if self._temp_dir.exists():
shutil.rmtree(self._temp_dir)
self._temp_dir.mkdir(exist_ok=True)
logger.debug(f"Cleaned temp dir: {self._temp_dir}")
def temp_dir(self):
nr = self._temp_nr
self._temp_nr += 1
temp_dir = pathlib.Path(self._temp_dir, f"{nr:08}").resolve()
logger.debug(f"Produced new temp dir: {temp_dir}")
return temp_dir
def temp_file(self):
# generate the path to a new temp file in base_path/.tmp/
# make sure no two paths are the same
nr = self._temp_nr
self._temp_nr += 1
temp_file = pathlib.Path(self._temp_dir, f"{nr:08}.tmp").resolve()
logger.debug(f"Produced new temp file: {temp_file}")
return temp_file
def add_file(self, from_path, to_path):
if not from_path.exists():
raise utils.FileNotFoundException(f"Could not add file at {from_path}")
# check if sync_dir/to_path is inside sync_dir?
to_path = pathlib.Path(self._sync_dir, to_path)
if to_path.exists() and to_path.is_dir():
if self._prompt_yes_no(f"Overwrite folder {to_path} with file?", default=False):
shutil.rmtree(to_path)
# Destination file is directory
if dst_absolute.exists() and dst_absolute.is_dir():
if prompt_yes_no(f"Overwrite folder {dst_absolute} with file?", default=False):
shutil.rmtree(dst_absolute)
else:
logger.warn(f"Could not add file {to_path}")
return
PRETTY.warning(f"Could not add file {str(dst_absolute)!r}")
return None
if to_path.exists():
if filecmp.cmp(from_path, to_path, shallow=False):
pretty.ignored_file(to_path)
# Destination file exists
if dst_absolute.exists() and dst_absolute.is_file():
if filecmp.cmp(str(src_absolute), str(dst_absolute), shallow=False):
# Bail out, nothing more to do
PRETTY.ignored_file(dst_absolute, "same file contents")
self.mark(dst)
return dst_absolute
# remember path for later reference
self._added_files.add(to_path.resolve())
logger.debug(f"Added file {to_path.resolve()}")
# No further action needed, especially not overwriting symlinks...
return
self.download_summary.add_modified_file(dst_absolute)
PRETTY.modified_file(dst_absolute)
else:
pretty.modified_file(to_path)
self.download_summary.add_new_file(dst_absolute)
PRETTY.new_file(dst_absolute)
# Create parent dir if needed
dst_parent_dir: Path = dst_absolute.parent
dst_parent_dir.mkdir(exist_ok=True, parents=True)
# Move file
shutil.move(str(src_absolute), str(dst_absolute))
self.mark(dst)
return dst_absolute
def mark(self, path: PurePath) -> None:
"""Mark a file as used so it will not get cleaned up."""
absolute_path = self.resolve(path)
self._known_files.add(absolute_path)
LOGGER.debug("Tracked %s", absolute_path)
def _is_marked(self, path: PurePath) -> bool:
"""
Checks whether a file is marked.
"""
absolute_path = self.resolve(path)
return absolute_path in self._known_files
def cleanup(self) -> None:
"""Remove all untracked files in the organizer's dir."""
LOGGER.debug("Deleting all untracked files...")
self._cleanup(self.path)
def _cleanup(self, start_dir: Path) -> None:
paths: List[Path] = list(start_dir.iterdir())
# Recursively clean paths
for path in paths:
if path.is_dir():
self._cleanup(path)
else:
pretty.new_file(to_path)
if path.resolve() not in self._known_files:
self._delete_file_if_confirmed(path)
# copy the file from from_path to sync_dir/to_path
# If the file being replaced was a symlink, the link itself is overwritten,
# not the file the link points to.
to_path.parent.mkdir(parents=True, exist_ok=True)
from_path.replace(to_path)
logger.debug(f"Moved {from_path} to {to_path}")
# Delete dir if it was empty and untracked
dir_empty = len(list(start_dir.iterdir())) == 0
if start_dir.resolve() not in self._known_files and dir_empty:
start_dir.rmdir()
# remember path for later reference, after the new file was written
# This is necessary here because otherwise, resolve() would resolve the symlink too.
self._added_files.add(to_path.resolve())
logger.debug(f"Added file {to_path.resolve()}")
def _delete_file_if_confirmed(self, path: Path) -> None:
prompt = f"Do you want to delete {path}"
def clean_sync_dir(self):
self._clean_dir(self._sync_dir, remove_parent=False)
logger.debug(f"Cleaned sync dir: {self._sync_dir}")
def _clean_dir(self, path, remove_parent=True):
for child in sorted(path.iterdir()):
logger.debug(f"Looking at {child.resolve()}")
if child.is_dir():
self._clean_dir(child, remove_parent=True)
elif child.resolve() not in self._added_files:
if self._prompt_yes_no(f"Delete {child}?", default=False):
child.unlink()
logger.debug(f"Deleted {child}")
if remove_parent:
try:
path.rmdir()
except OSError: # directory not empty
pass
def _prompt_yes_no(self, question, default=None):
if default is True:
prompt = "[Y/n]"
elif default is False:
prompt = "[y/N]"
else:
prompt = "[y/n]"
text = f"{question} {prompt} "
WRONG_REPLY = "Please reply with 'yes'/'y' or 'no'/'n'."
while True:
response = input(text).strip().lower()
if response in {"yes", "ye", "y"}:
return True
elif response in {"no", "n"}:
return False
elif response == "":
if default is None:
print(WRONG_REPLY)
else:
return default
else:
print(WRONG_REPLY)
# How to use:
#
# 1. Before downloading any files
# orga = Organizer("/home/user/sync/", "/home/user/sync/bookstore/")
# orga.clean_temp_dir()
#
# 2. Downloading a file
# tempfile = orga.temp_file()
# download_something_to(tempfile)
# orga.add_file(tempfile, "books/douglas_adams/hhgttg"
#
# 3. After downloading all files
# orga.clean_sync_dir()
# orga.clean_temp_dir()
if prompt_yes_no(prompt, False):
self.download_summary.add_deleted_file(path)
path.unlink()

281
PFERD/pferd.py Normal file
View File

@ -0,0 +1,281 @@
"""
Convenience functions for using PFERD.
"""
import logging
from pathlib import Path
from typing import Callable, List, Optional, Union
from .cookie_jar import CookieJar
from .diva import (DivaDownloader, DivaDownloadStrategy, DivaPlaylistCrawler,
diva_download_new)
from .download_summary import DownloadSummary
from .errors import FatalException, swallow_and_print_errors
from .ilias import (IliasAuthenticator, IliasCrawler, IliasDirectoryFilter,
IliasDownloader, IliasDownloadInfo, IliasDownloadStrategy,
KitShibbolethAuthenticator, download_modified_or_new)
from .location import Location
from .logging import PrettyLogger, enable_logging
from .organizer import Organizer
from .tmp_dir import TmpDir
from .transform import TF, Transform, apply_transform
from .utils import PathLike, to_path
# TODO save known-good cookies as soon as possible
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class Pferd(Location):
# pylint: disable=too-many-arguments
"""
The main entrypoint in your Pferd usage: This class combines a number of
useful shortcuts for running synchronizers in a single interface.
"""
def __init__(
self,
base_dir: Path,
tmp_dir: Path = Path(".tmp"),
test_run: bool = False
):
super().__init__(Path(base_dir))
self._download_summary = DownloadSummary()
self._tmp_dir = TmpDir(self.resolve(tmp_dir))
self._test_run = test_run
@staticmethod
def enable_logging() -> None:
"""
Enable and configure logging via the logging module.
"""
enable_logging()
@staticmethod
def _print_transformables(transformables: List[TF]) -> None:
LOGGER.info("")
LOGGER.info("Results of the test run:")
for transformable in transformables:
LOGGER.info(transformable.path)
def _ilias(
self,
target: PathLike,
base_url: str,
crawl_function: Callable[[IliasCrawler], List[IliasDownloadInfo]],
authenticator: IliasAuthenticator,
cookies: Optional[PathLike],
dir_filter: IliasDirectoryFilter,
transform: Transform,
download_strategy: IliasDownloadStrategy,
clean: bool = True
) -> Organizer:
# pylint: disable=too-many-locals
cookie_jar = CookieJar(to_path(cookies) if cookies else None)
session = cookie_jar.create_session()
tmp_dir = self._tmp_dir.new_subdir()
organizer = Organizer(self.resolve(to_path(target)))
crawler = IliasCrawler(base_url, session, authenticator, dir_filter)
downloader = IliasDownloader(tmp_dir, organizer, session, authenticator, download_strategy)
cookie_jar.load_cookies()
info = crawl_function(crawler)
cookie_jar.save_cookies()
transformed = apply_transform(transform, info)
if self._test_run:
self._print_transformables(transformed)
return organizer
downloader.download_all(transformed)
cookie_jar.save_cookies()
if clean:
organizer.cleanup()
return organizer
@swallow_and_print_errors
def ilias_kit(
self,
target: PathLike,
course_id: str,
dir_filter: IliasDirectoryFilter = lambda x, y: True,
transform: Transform = lambda x: x,
cookies: Optional[PathLike] = None,
username: Optional[str] = None,
password: Optional[str] = None,
download_strategy: IliasDownloadStrategy = download_modified_or_new,
clean: bool = True,
) -> Organizer:
"""
Synchronizes a folder with the ILIAS instance of the KIT.
Arguments:
target {Path} -- the target path to write the data to
course_id {str} -- the id of the main course page (found in the URL after ref_id
when opening the course homepage)
Keyword Arguments:
dir_filter {IliasDirectoryFilter} -- A filter for directories. Will be applied on the
crawler level, these directories and all of their content is skipped.
(default: {lambdax:True})
transform {Transform} -- A transformation function for the output paths. Return None
to ignore a file. (default: {lambdax:x})
cookies {Optional[Path]} -- The path to store and load cookies from.
(default: {None})
username {Optional[str]} -- The SCC username. If none is given, it will prompt
the user. (default: {None})
password {Optional[str]} -- The SCC password. If none is given, it will prompt
the user. (default: {None})
download_strategy {DownloadStrategy} -- A function to determine which files need to
be downloaded. Can save bandwidth and reduce the number of requests.
(default: {download_modified_or_new})
clean {bool} -- Whether to clean up when the method finishes.
"""
# This authenticator only works with the KIT ilias instance.
authenticator = KitShibbolethAuthenticator(username=username, password=password)
PRETTY.starting_synchronizer(target, "ILIAS", course_id)
organizer = self._ilias(
target=target,
base_url="https://ilias.studium.kit.edu/",
crawl_function=lambda crawler: crawler.crawl_course(course_id),
authenticator=authenticator,
cookies=cookies,
dir_filter=dir_filter,
transform=transform,
download_strategy=download_strategy,
clean=clean,
)
self._download_summary.merge(organizer.download_summary)
return organizer
def print_summary(self) -> None:
"""
Prints the accumulated download summary.
"""
PRETTY.summary(self._download_summary)
@swallow_and_print_errors
def ilias_kit_personal_desktop(
self,
target: PathLike,
dir_filter: IliasDirectoryFilter = lambda x, y: True,
transform: Transform = lambda x: x,
cookies: Optional[PathLike] = None,
username: Optional[str] = None,
password: Optional[str] = None,
download_strategy: IliasDownloadStrategy = download_modified_or_new,
clean: bool = True,
) -> Organizer:
"""
Synchronizes a folder with the ILIAS instance of the KIT. This method will crawl the ILIAS
"personal desktop" instead of a single course.
Arguments:
target {Path} -- the target path to write the data to
Keyword Arguments:
dir_filter {IliasDirectoryFilter} -- A filter for directories. Will be applied on the
crawler level, these directories and all of their content is skipped.
(default: {lambdax:True})
transform {Transform} -- A transformation function for the output paths. Return None
to ignore a file. (default: {lambdax:x})
cookies {Optional[Path]} -- The path to store and load cookies from.
(default: {None})
username {Optional[str]} -- The SCC username. If none is given, it will prompt
the user. (default: {None})
password {Optional[str]} -- The SCC password. If none is given, it will prompt
the user. (default: {None})
download_strategy {DownloadStrategy} -- A function to determine which files need to
be downloaded. Can save bandwidth and reduce the number of requests.
(default: {download_modified_or_new})
clean {bool} -- Whether to clean up when the method finishes.
"""
# This authenticator only works with the KIT ilias instance.
authenticator = KitShibbolethAuthenticator(username=username, password=password)
PRETTY.starting_synchronizer(target, "ILIAS", "Personal Desktop")
organizer = self._ilias(
target=target,
base_url="https://ilias.studium.kit.edu/",
crawl_function=lambda crawler: crawler.crawl_personal_desktop(),
authenticator=authenticator,
cookies=cookies,
dir_filter=dir_filter,
transform=transform,
download_strategy=download_strategy,
clean=clean,
)
self._download_summary.merge(organizer.download_summary)
return organizer
@swallow_and_print_errors
def diva_kit(
self,
target: Union[PathLike, Organizer],
playlist_location: str,
transform: Transform = lambda x: x,
download_strategy: DivaDownloadStrategy = diva_download_new,
clean: bool = True
) -> Organizer:
"""
Synchronizes a folder with a DIVA playlist.
Arguments:
organizer {Organizer} -- The organizer to use.
playlist_location {str} -- the playlist id or the playlist URL
in the format 'https://mediaservice.bibliothek.kit.edu/#/details/DIVA-2019-271'
Keyword Arguments:
transform {Transform} -- A transformation function for the output paths. Return None
to ignore a file. (default: {lambdax:x})
download_strategy {DivaDownloadStrategy} -- A function to determine which files need to
be downloaded. Can save bandwidth and reduce the number of requests.
(default: {diva_download_new})
clean {bool} -- Whether to clean up when the method finishes.
"""
tmp_dir = self._tmp_dir.new_subdir()
if playlist_location.startswith("http"):
playlist_id = DivaPlaylistCrawler.fetch_id(playlist_link=playlist_location)
else:
playlist_id = playlist_location
if target is None:
PRETTY.starting_synchronizer("None", "DIVA", playlist_id)
raise FatalException("Got 'None' as target directory, aborting")
if isinstance(target, Organizer):
organizer = target
else:
organizer = Organizer(self.resolve(to_path(target)))
PRETTY.starting_synchronizer(organizer.path, "DIVA", playlist_id)
crawler = DivaPlaylistCrawler(playlist_id)
downloader = DivaDownloader(tmp_dir, organizer, download_strategy)
info = crawler.crawl()
transformed = apply_transform(transform, info)
if self._test_run:
self._print_transformables(transformed)
return organizer
downloader.download_all(transformed)
if clean:
organizer.cleanup()
return organizer

111
PFERD/progress.py Normal file
View File

@ -0,0 +1,111 @@
"""
A small progress bar implementation.
"""
import sys
from dataclasses import dataclass
from types import TracebackType
from typing import Optional, Type
import requests
from rich.console import Console
from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID,
TextColumn, TimeRemainingColumn,
TransferSpeedColumn)
_progress: Progress = Progress(
TextColumn("[bold blue]{task.fields[name]}", justify="right"),
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.1f}%",
"",
DownloadColumn(),
"",
TransferSpeedColumn(),
"",
TimeRemainingColumn(),
console=Console(file=sys.stdout),
transient=True
)
def size_from_headers(response: requests.Response) -> Optional[int]:
"""
Return the size of the download based on the response headers.
Arguments:
response {requests.Response} -- the response
Returns:
Optional[int] -- the size
"""
if "Content-Length" in response.headers:
return int(response.headers["Content-Length"])
return None
@dataclass
class ProgressSettings:
"""
Settings you can pass to customize the progress bar.
"""
name: str
max_size: int
def progress_for(settings: Optional[ProgressSettings]) -> 'ProgressContextManager':
"""
Returns a context manager that displays progress
Returns:
ProgressContextManager -- the progress manager
"""
return ProgressContextManager(settings)
class ProgressContextManager:
"""
A context manager used for displaying progress.
"""
def __init__(self, settings: Optional[ProgressSettings]):
self._settings = settings
self._task_id: Optional[TaskID] = None
def __enter__(self) -> 'ProgressContextManager':
"""Context manager entry function."""
if not self._settings:
return self
_progress.start()
self._task_id = _progress.add_task(
self._settings.name,
total=self._settings.max_size,
name=self._settings.name
)
return self
# pylint: disable=useless-return
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
"""Context manager exit function. Removes the task."""
if self._task_id is None:
return None
_progress.remove_task(self._task_id)
if len(_progress.task_ids) == 0:
# We need to clean up after ourselves, as we were the last one
_progress.stop()
_progress.refresh()
return None
def advance(self, amount: float) -> None:
"""
Advances the progress bar.
"""
if self._task_id is not None:
_progress.advance(self._task_id, amount)

View File

@ -1,111 +0,0 @@
# Fakultät für Mathematik (FfM)
import getpass
import logging
import pathlib
import re
import bs4
import requests
from .organizer import Organizer
from .utils import stream_to_path, PrettyLogger
__all__ = ["Ti"]
logger = logging.getLogger(__name__)
pretty = PrettyLogger(logger)
class Ti:
BASE_URL = "http://ti.ira.uka.de/"
FILE_RE = re.compile(r"^.+\.pdf$")
def __init__(self, base_path):
self.base_path = base_path
self._session = requests.Session()
self._credentials = None
def synchronize(self, urlpart, to_dir, transform=lambda x: x,
filter=lambda x: True):
pretty.starting_synchronizer(to_dir, "Ti", urlpart)
sync_path = pathlib.Path(self.base_path, to_dir)
orga = Organizer(self.base_path, sync_path)
orga.clean_temp_dir()
self._reset_credentials()
available = self._find_available(urlpart)
for name, address in sorted(available.items()):
path = pathlib.PurePath(name)
if filter(path):
self._crawl(urlpart + address, path, orga, transform)
else:
loggwe.info(f"Skipping {name}/")
orga.clean_sync_dir()
orga.clean_temp_dir()
self._reset_credentials()
def _find_available(self, urlpart):
url = self.BASE_URL + urlpart
r = self._session.get(url)
soup = bs4.BeautifulSoup(r.text, "html.parser")
available = {}
if soup.find(href="./Vorlesung/Vorlesung.php"):
logger.info("Found Folien/")
available["Folien"] = "/Vorlesung/"
if soup.find(href="./Uebungen/Uebungen.php"):
logger.info("Found Blätter/")
available["Blätter"] = "/Uebungen/"
if soup.find(href="./Tutorien/Tutorien.php"):
logger.info("Found Tutorien/")
available["Tutorien"] = "/Tutorien/"
return available
def _crawl(self, urlpart, path, orga, transform):
url = self.BASE_URL + urlpart
r = self._session.get(url)
soup = bs4.BeautifulSoup(r.text, "html.parser")
for filelink in soup.find_all("a", href=self.FILE_RE):
filepath = path / filelink["href"]
fileurl = url + "/" + filelink["href"]
new_path = transform(filepath)
if new_path is None:
continue
logger.debug(f"Transformed from {filepath} to {new_path}")
temp_path = orga.temp_file()
self._download(fileurl, temp_path)
orga.add_file(temp_path, new_path)
def _get_credentials(self):
if self._credentials is None:
print("Please enter Ti credentials.")
username = getpass.getpass(prompt="Username: ")
password = getpass.getpass(prompt="Password: ")
self._credentials = (username, password)
return self._credentials
def _reset_credentials(self):
self._credentials = None
def _download(self, url, to_path):
while True:
username, password = self._get_credentials()
with self._session.get(url, stream=True, auth=(username, password)) as r:
if r.ok:
stream_to_path(r, to_path)
return
else:
print("Incorrect credentials.")
self._reset_credentials()

79
PFERD/tmp_dir.py Normal file
View File

@ -0,0 +1,79 @@
"""Helper functions and classes for temporary folders."""
import logging
import shutil
from pathlib import Path
from types import TracebackType
from typing import Optional, Type
from .location import Location
LOGGER = logging.getLogger(__name__)
class TmpDir(Location):
"""A temporary folder that can create files or nested temp folders."""
def __init__(self, path: Path):
"""Create a new temporary folder for the given path."""
super().__init__(path)
self._counter = 0
self.cleanup()
self.path.mkdir(parents=True, exist_ok=True)
def __str__(self) -> str:
"""Format the folder as a string."""
return f"Folder at {self.path}"
def __enter__(self) -> 'TmpDir':
"""Context manager entry function."""
return self
# pylint: disable=useless-return
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
"""Context manager exit function. Calls cleanup()."""
self.cleanup()
return None
def new_path(self, prefix: Optional[str] = None) -> Path:
"""
Return a unique path inside the directory. Doesn't create a file or
directory.
"""
name = f"{prefix if prefix else 'tmp'}-{self._inc_and_get_counter():03}"
LOGGER.debug("Creating temp file %s", name)
return self.resolve(Path(name))
def new_subdir(self, prefix: Optional[str] = None) -> 'TmpDir':
"""
Create a new nested temporary folder and return it.
"""
name = f"{prefix if prefix else 'tmp'}-{self._inc_and_get_counter():03}"
sub_path = self.resolve(Path(name))
sub_path.mkdir(parents=True)
LOGGER.debug("Creating temp dir %s at %s", name, sub_path)
return TmpDir(sub_path)
def cleanup(self) -> None:
"""Delete this folder and all contained files."""
LOGGER.debug("Deleting temp folder %s", self.path)
if self.path.resolve().exists():
shutil.rmtree(self.path.resolve())
def _inc_and_get_counter(self) -> int:
"""Get and increment the counter by one."""
counter = self._counter
self._counter += 1
return counter

127
PFERD/transform.py Normal file
View File

@ -0,0 +1,127 @@
"""
Transforms let the user define functions to decide where the downloaded files
should be placed locally. They let the user do more advanced things like moving
only files whose names match a regex, or renaming files from one numbering
scheme to another.
"""
from dataclasses import dataclass
from pathlib import PurePath
from typing import Callable, List, Optional, TypeVar
from .utils import PathLike, Regex, to_path, to_pattern
Transform = Callable[[PurePath], Optional[PurePath]]
@dataclass
class Transformable:
"""
An object that can be transformed by a Transform.
"""
path: PurePath
TF = TypeVar("TF", bound=Transformable)
def apply_transform(
transform: Transform,
transformables: List[TF],
) -> List[TF]:
"""
Apply a Transform to multiple Transformables, discarding those that were
not transformed by the Transform.
"""
result: List[TF] = []
for transformable in transformables:
new_path = transform(transformable.path)
if new_path:
transformable.path = new_path
result.append(transformable)
return result
# Transform combinators
keep = lambda path: path
def attempt(*args: Transform) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
for transform in args:
result = transform(path)
if result:
return result
return None
return inner
def optionally(transform: Transform) -> Transform:
return attempt(transform, lambda path: path)
def do(*args: Transform) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
current = path
for transform in args:
result = transform(current)
if result:
current = result
else:
return None
return current
return inner
def predicate(pred: Callable[[PurePath], bool]) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
if pred(path):
return path
return None
return inner
def glob(pattern: str) -> Transform:
return predicate(lambda path: path.match(pattern))
def move_dir(source_dir: PathLike, target_dir: PathLike) -> Transform:
source_path = to_path(source_dir)
target_path = to_path(target_dir)
def inner(path: PurePath) -> Optional[PurePath]:
if source_path in path.parents:
return target_path / path.relative_to(source_path)
return None
return inner
def move(source: PathLike, target: PathLike) -> Transform:
source_path = to_path(source)
target_path = to_path(target)
def inner(path: PurePath) -> Optional[PurePath]:
if path == source_path:
return target_path
return None
return inner
def rename(source: str, target: str) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
if path.name == source:
return path.with_name(target)
return None
return inner
def re_move(regex: Regex, target: str) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
match = to_pattern(regex).fullmatch(str(path))
if match:
groups = [match.group(0)]
groups.extend(match.groups())
return PurePath(target.format(*groups))
return None
return inner
def re_rename(regex: Regex, target: str) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
match = to_pattern(regex).fullmatch(path.name)
if match:
groups = [match.group(0)]
groups.extend(match.groups())
return path.with_name(target.format(*groups))
return None
return inner

View File

@ -1,64 +1,98 @@
import os
import sys
import pathlib
from colorama import Style
from colorama import Fore
__all__ = [
"get_base_dir",
"move",
"rename",
"stream_to_path",
"ContentTypeException",
"FileNotFoundException",
"PrettyLogger",
]
def get_base_dir(script_file):
return pathlib.Path(os.path.dirname(os.path.abspath(script_file)))
def move(path, from_folders, to_folders):
l = len(from_folders)
if path.parts[:l] == from_folders:
return pathlib.PurePath(*to_folders, *path.parts[l:])
def rename(path, to_name):
return pathlib.PurePath(*path.parts[:-1], to_name)
def stream_to_path(response, to_path, chunk_size=1024**2):
with open(to_path, 'wb') as fd:
for chunk in response.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def isOutputPipe():
"""Returns whether this program's output is attached to a pipe.
"""
return sys.stdout.isatty
A few utility bobs and bits.
"""
class ContentTypeException(Exception):
pass
import re
from pathlib import Path, PurePath
from typing import Optional, Tuple, Union
class FileNotFoundException(Exception):
pass
import bs4
import requests
class PrettyLogger:
from .progress import ProgressSettings, progress_for, size_from_headers
def __init__(self, logger):
self.logger = logger
PathLike = Union[PurePath, str, Tuple[str, ...]]
def modified_file(self, file_name):
self.logger.info(f"{Fore.MAGENTA}{Style.BRIGHT}Modified {file_name}.{Style.RESET_ALL}")
def new_file(self, file_name):
self.logger.info(f"{Fore.GREEN}{Style.BRIGHT}Created {file_name}.{Style.RESET_ALL}")
def to_path(pathlike: PathLike) -> Path:
"""
Convert a given PathLike into a Path.
"""
if isinstance(pathlike, tuple):
return Path(*pathlike)
return Path(pathlike)
def ignored_file(self, file_name):
self.logger.info(f"{Style.DIM}Ignored {file_name}.{Style.RESET_ALL}")
def starting_synchronizer(self, target_directory, synchronizer_name, subject=None):
subject_str = f"{subject} " if subject else ""
self.logger.info("")
self.logger.info((
f"{Fore.CYAN}{Style.BRIGHT}Synchronizing {subject_str}to {target_directory}"
f" using the {synchronizer_name} synchronizer.{Style.RESET_ALL}"
))
Regex = Union[str, re.Pattern]
def to_pattern(regex: Regex) -> re.Pattern:
"""
Convert a regex to a re.Pattern.
"""
if isinstance(regex, re.Pattern):
return regex
return re.compile(regex)
def soupify(response: requests.Response) -> bs4.BeautifulSoup:
"""
Wrap a requests response in a bs4 object.
"""
return bs4.BeautifulSoup(response.text, "html.parser")
def stream_to_path(
response: requests.Response,
target: Path,
progress_name: Optional[str] = None,
chunk_size: int = 1024 ** 2
) -> None:
"""
Download a requests response content to a file by streaming it. This
function avoids excessive memory usage when downloading large files. The
chunk_size is in bytes.
If progress_name is None, no progress bar will be shown. Otherwise a progress
bar will appear, if the download is bigger than an internal threshold.
"""
with response:
length = size_from_headers(response)
if progress_name and length and int(length) > 1024 * 1024 * 10: # 10 MiB
settings: Optional[ProgressSettings] = ProgressSettings(progress_name, length)
else:
settings = None
with open(target, 'wb') as file_descriptor:
with progress_for(settings) as progress:
for chunk in response.iter_content(chunk_size=chunk_size):
file_descriptor.write(chunk)
progress.advance(len(chunk))
def prompt_yes_no(question: str, default: Optional[bool] = None) -> bool:
"""
Prompts the user a yes/no question and returns their choice.
"""
if default is True:
prompt = "[Y/n]"
elif default is False:
prompt = "[y/N]"
else:
prompt = "[y/n]"
text = f"{question} {prompt} "
wrong_reply = "Please reply with 'yes'/'y' or 'no'/'n'."
while True:
response = input(text).strip().lower()
if response in {"yes", "ye", "y"}:
return True
if response in {"no", "n"}:
return False
if response == "" and default is not None:
return default
print(wrong_reply)

219
README.md
View File

@ -2,31 +2,48 @@
**P**rogramm zum **F**lotten, **E**infachen **R**unterladen von **D**ateien
- [Installation](#installation)
- [Upgrading from 2.0.0 to 2.1.0+](#upgrading-from-200-to-210)
- [Example setup](#example-setup)
- [Usage](#usage)
- [General concepts](#general-concepts)
- [Constructing transforms](#constructing-transforms)
- [Transform creators](#transform-creators)
- [Transform combinators](#transform-combinators)
- [A short, but commented example](#a-short-but-commented-example)
## Installation
Ensure that you have at least Python 3.7 installed (3.6 might also work, didn't
test it though).
Ensure that you have at least Python 3.8 installed.
To install PFERD or update your installation to the latest version, run this
wherever you want to install/have installed PFERD:
wherever you want to install or have already installed PFERD:
```
$ pip install git+https://github.com/Garmelon/PFERD@v1.1.3
$ pip install git+https://github.com/Garmelon/PFERD@v2.2.0
```
The use of [venv](https://docs.python.org/3/library/venv.html) is recommended.
The use of [venv] is recommended.
[venv]: https://docs.python.org/3/library/venv.html
### Upgrading from 2.0.0 to 2.1.0+
- The `IliasDirectoryType` type was renamed to `IliasElementType` and is now far more detailed.
The new values are: `REGULAR_FOLDER`, `VIDEO_FOLDER`, `EXERCISE_FOLDER`, `REGULAR_FILE`, `VIDEO_FILE`, `FORUM`, `EXTERNAL_LINK`.
- Forums and external links are skipped automatically if you use the `kit_ilias` helper.
## Example setup
In this example, `python3` refers to at least Python 3.7.
In this example, `python3` refers to at least Python 3.8.
A full example setup and initial use could look like:
```
$ mkdir Vorlesungen
$ cd Vorlesungen
$ python3 -m venv .
$ . bin/activate
$ pip install git+https://github.com/Garmelon/PFERD@v1.1.3
$ curl -O https://raw.githubusercontent.com/Garmelon/PFERD/master/example_config.py
$ python3 -m venv .venv
$ .venv/bin/activate
$ pip install git+https://github.com/Garmelon/PFERD@v2.2.0
$ curl -O https://raw.githubusercontent.com/Garmelon/PFERD/v2.2.0/example_config.py
$ python3 example_config.py
$ deactivate
```
@ -34,7 +51,187 @@ $ deactivate
Subsequent runs of the program might look like:
```
$ cd Vorlesungen
$ . bin/activate
$ .venv/bin/activate
$ python3 example_config.py
$ deactivate
```
If you just want to get started and crawl *your entire ILIAS Desktop* instead
of a given set of courses, please replace `example_config.py` with
`example_config_personal_desktop.py` in all of the instructions below (`curl` call and
`python3` run command).
## Usage
### General concepts
A PFERD config is a normal python file that starts multiple *synchronizers*
which do all the heavy lifting. While you can create and wire them up manually,
you are encouraged to use the helper methods provided in `PFERD.Pferd`.
The synchronizers take some input arguments specific to their service and a
*transform*. The transform receives the computed path of an element in ILIAS and
can return either an output path (so you can rename files or move them around as
you wish) or `None` if you do not want to save the given file.
Additionally the ILIAS synchronizer allows you to define a *crawl filter*. This
filter also receives the computed path as the input, but is only called for
*directories*. If you return `True`, the directory will be crawled and
searched. If you return `False` the directory will be ignored and nothing in it
will be passed to the transform.
### Constructing transforms
While transforms are just normal python functions, writing them by hand can
quickly become tedious. In order to help you with writing your own transforms
and filters, PFERD defines a few useful transform creators and combinators in
the `PFERD.transform` module:
#### Transform creators
These methods let you create a few basic transform building blocks:
- **`glob(glob)`**
Creates a transform that returns the unchanged path if the glob matches the path and `None` otherwise.
See also [Path.match].
Example: `glob("Übung/*.pdf")`
- **`predicate(pred)`**
Creates a transform that returns the unchanged path if `pred(path)` returns a truthy value.
Returns `None` otherwise.
Example: `predicate(lambda path: len(path.parts) == 3)`
- **`move_dir(source, target)`**
Creates a transform that moves all files from the `source` to the `target` directory.
Example: `move_dir("Übung/", "Blätter/")`
- **`move(source, target)`**
Creates a transform that moves the `source` file to `target`.
Example: `move("Vorlesung/VL02_Automten.pdf", "Vorlesung/VL02_Automaten.pdf")`
- **`rename(source, target)`**
Creates a transform that renames all files named `source` to `target`.
This transform works on the file names, not paths, and thus works no matter where the file is located.
Example: `rename("VL02_Automten.pdf", "VL02_Automaten.pdf")`
- **`re_move(regex, target)`**
Creates a transform that moves all files matching `regex` to `target`.
The transform `str.format` on the `target` string with the contents of the capturing groups before returning it.
The capturing groups can be accessed via their index.
See also [Match.group].
Example: `re_move(r"Übung/Blatt (\d+)\.pdf", "Blätter/Blatt_{1:0>2}.pdf")`
- **`re_rename(regex, target)`**
Creates a transform that renames all files matching `regex` to `target`.
This transform works on the file names, not paths, and thus works no matter where the file is located.
Example: `re_rename(r"VL(\d+)(.*)\.pdf", "Vorlesung_Nr_{1}__{2}.pdf")`
All movement or rename transforms above return `None` if a file doesn't match
their movement or renaming criteria. This enables them to be used as building
blocks to build up more complex transforms.
In addition, `PFERD.transform` also defines the `keep` transform which returns its input path unchanged.
This behaviour can be very useful when creating more complex transforms.
See below for example usage.
[Path.match]: https://docs.python.org/3/library/pathlib.html#pathlib.Path.match
[Match.group]: https://docs.python.org/3/library/re.html#re.Match.group
#### Transform combinators
These methods let you combine transforms into more complex transforms:
- **`optionally(transform)`**
Wraps a given transform and returns its result if it is not `None`.
Otherwise returns the input path unchanged.
See below for example usage.
* **`do(transforms)`**
Accepts a series of transforms and applies them in the given order to the result of the previous one.
If any transform returns `None`, `do` short-circuits and also returns `None`.
This can be used to perform multiple renames in a row:
```py
do(
# Move them
move_dir("Vorlesungsmaterial/Vorlesungsvideos/", "Vorlesung/Videos/"),
# Fix extensions (if they have any)
optionally(re_rename("(.*).m4v.mp4", "{1}.mp4")),
# Remove the 'dbs' prefix (if they have any)
optionally(re_rename("(?i)dbs-(.+)", "{1}")),
)
```
- **`attempt(transforms)`**
Applies the passed transforms in the given order until it finds one that does not return `None`.
If it does not find any, it returns `None`.
This can be used to give a list of possible transformations and automatically pick the first one that fits:
```py
attempt(
# Move all videos. If a video is passed in, this `re_move` will succeed
# and attempt short-circuits with the result.
re_move(r"Vorlesungsmaterial/.*/(.+?)\.mp4", "Vorlesung/Videos/{1}.mp4"),
# Move the whole folder to a nicer name - now without any mp4!
move_dir("Vorlesungsmaterial/", "Vorlesung/"),
# If we got another file, keep it.
keep,
)
```
All of these combinators are used in the provided example configs, if you want
to see some more real-life usages.
### A short, but commented example
```py
from pathlib import Path, PurePath
from PFERD import Pferd
from PFERD.ilias import IliasElementType
from PFERD.transform import *
# This filter will later be used by the ILIAS crawler to decide whether it
# should crawl a directory (or directory-like structure).
def filter_course(path: PurePath, type: IliasElementType) -> bool:
# Note that glob returns a Transform, which is a function from PurePath ->
# Optional[PurePath]. Because of this, we need to apply the result of
# 'glob' to our input path. The returned value will be truthy (a Path) if
# the transform succeeded, or `None` if it failed.
# We need to crawl the 'Tutorien' folder as it contains one that we want.
if glob("Tutorien/")(path):
return True
# If we found 'Tutorium 10', keep it!
if glob("Tutorien/Tutorium 10")(path):
return True
# Discard all other folders inside 'Tutorien'
if glob("Tutorien/*")(path):
return False
# All other dirs (including subdirs of 'Tutorium 10') should be searched :)
return True
# This transform will later be used to rename a few files. It can also be used
# to ignore some files.
transform_course = attempt(
# We don't care about the other tuts and would instead prefer a cleaner
# directory structure.
move_dir("Tutorien/Tutorium 10/", "Tutorium/"),
# We don't want to modify any other files, so we're going to keep them
# exactly as they are.
keep
)
# Enable and configure the text output. Needs to be called before calling any
# other PFERD methods.
Pferd.enable_logging()
# Create a Pferd instance rooted in the same directory as the script file. This
# is not a test run, so files will be downloaded (default, can be omitted).
pferd = Pferd(Path(__file__).parent, test_run=False)
# Use the ilias_kit helper to synchronize an ILIAS course
pferd.ilias_kit(
# The directory that all of the downloaded files should be placed in
"My_cool_course/",
# The course ID (found in the URL when on the course page in ILIAS)
"course id",
# A path to a cookie jar. If you synchronize multiple ILIAS courses,
# setting this to a common value requires you to only log in once.
cookies=Path("ilias_cookies.txt"),
# A transform can rename, move or filter out certain files
transform=transform_course,
# A crawl filter limits what paths the cralwer searches
dir_filter=filter_course,
)
```

View File

@ -1,342 +1,131 @@
#!/bin/env python3
import argparse
from pathlib import Path, PurePath
import re
import sys
from PFERD import Pferd
from PFERD.ilias import IliasElementType
from PFERD.transform import (attempt, do, glob, keep, move, move_dir,
optionally, re_move, re_rename)
import PFERD
from PFERD.utils import get_base_dir, move, rename
tf_ss_2020_numerik = attempt(
re_move(r"Übungsblätter/(\d+)\. Übungsblatt/.*", "Blätter/Blatt_{1:0>2}.pdf"),
keep,
)
#PFERD.enable_logging(logging.DEBUG)
PFERD.enable_logging()
base_dir = get_base_dir(__file__)
tf_ss_2020_db = attempt(
move_dir("Begrüßungsvideo/", "Vorlesung/Videos/"),
do(
move_dir("Vorlesungsmaterial/Vorlesungsvideos/", "Vorlesung/Videos/"),
optionally(re_rename("(.*).m4v.mp4", "{1}.mp4")),
optionally(re_rename("(?i)dbs-(.+)", "{1}")),
),
move_dir("Vorlesungsmaterial/", "Vorlesung/"),
keep,
)
# Semester 1
def gbi_filter(path):
# Tutorien rausfiltern
if path.parts[:1] == ("Tutoriumsfolien",):
if path.parts[1:] == (): return True
if path.parts[1:2] == ("Tutorium 15",): return True
tf_ss_2020_rechnernetze = attempt(
re_move(r"Vorlesungsmaterial/.*/(.+?)\.mp4", "Vorlesung/Videos/{1}.mp4"),
move_dir("Vorlesungsmaterial/", "Vorlesung/"),
keep,
)
tf_ss_2020_sicherheit = attempt(
move_dir("Vorlesungsvideos/", "Vorlesung/Videos/"),
move_dir("Übungsvideos/", "Übung/Videos/"),
re_move(r"VL(.*)\.pdf", "Vorlesung/{1}.pdf"),
re_move(r"Übungsblatt (\d+)\.pdf", "Blätter/Blatt_{1:0>2}.pdf"),
move("Chiffrat.txt", "Blätter/Blatt_01_Chiffrat.txt"),
keep,
)
tf_ss_2020_pg = attempt(
move_dir("Vorlesungsaufzeichnungen/", "Vorlesung/Videos/"),
move_dir("Vorlesungsmaterial/", "Vorlesung/"),
re_move(r"Übungen/uebungsblatt(\d+).pdf", "Blätter/Blatt_{1:0>2}.pdf"),
keep,
)
def df_ss_2020_or1(path: PurePath, _type: IliasElementType) -> bool:
if glob("Tutorien/")(path):
return True
if glob("Tutorien/Tutorium 10, dienstags 15:45 Uhr/")(path):
return True
if glob("Tutorien/*")(path):
return False
return True
def gbi_transform(path):
# Übungsblätter in Blätter/blatt_xx.pdf
new_path = move(path, ("Übungsblätter",), ("Blätter",))
if new_path is not None:
match = re.match(r"(\d+).aufgaben.pdf", new_path.name)
if match:
number = int(match.group(1))
return rename(new_path, f"blatt_{number:02}.pdf")
match = re.match(r"(\d+).loesungen.pdf", new_path.name)
if match:
number = int(match.group(1))
return rename(new_path, f"loesung_{number:02}.pdf")
return new_path
# Folien in Folien/*
new_path = move(path, ("Vorlesung: Folien",), ("Folien",))
if new_path is not None: return new_path
# Skripte in Skripte/*
new_path = move(path, ("Vorlesung: Skript",), ("Skripte",))
if new_path is not None:
if new_path.name == "k-21-relationen-skript.pdf":
return rename(new_path, "21-relationen-skript.pdf")
return new_path
# Übungsfolien in Übung/*
new_path = move(path, ("große Übung: Folien",), ("Übung",))
if new_path is not None: return new_path
# Tutoriumsfolien in Tutorium/*
new_path = move(path, ("Tutoriumsfolien","Tutorium 15"), ("Tutorium",))
if new_path is not None:
if new_path.name == "GBI_Tut_2 (1).pdf":
return rename(new_path, "GBI_Tut_2.pdf")
if new_path.name == "GBI_Tut_7 (1).pdf":
return rename(new_path, "GBI_Tut_7.pdf")
return new_path
return path
def hm1_transform(path):
match = re.match(r"blatt(\d+).pdf", path.name)
if match:
new_path = move(path, (), ("Blätter",))
number = int(match.group(1))
return rename(new_path, f"blatt_{number:02}.pdf")
match = re.match(r"blatt(\d+).loesungen.pdf", path.name)
if match:
new_path = move(path, (), ("Blätter",))
number = int(match.group(1))
return rename(new_path, f"loesung_{number:02}.pdf")
return path
def la1_filter(path):
# Tutorien rausfitern
if path.parts[:1] == ("Tutorien",):
if path.parts[1:] == (): return True
if path.parts[1:2] == ("Tutorium 03 - Philipp Faller",): return True
if path.parts[1:2] == ("Tutorium 23 - Sebastian Faller",): return True
return False
return True
def la1_transform(path):
# Alle Übungsblätter in Blätter/blatt_xx.pdf
# Alles andere Übungsmaterial in Blätter/*
new_path = move(path, ("Übungen",), ("Blätter",))
if new_path is not None:
match = re.match(r"Blatt(\d+).pdf", new_path.name)
if match:
number = int(match.group(1))
return rename(new_path, f"blatt_{number:02}.pdf")
if new_path.name == "Lösungen zu Blatt 1, Aufgabe 1 und Blatt 2, Aufgabe 4..pdf":
return rename(new_path, "Lösungen zu Blatt 1, Aufgabe 1 und Blatt 2, Aufgabe 4.pdf")
return new_path
# Alles Tutoriengedöns von Philipp in Tutorium/Philipp/*
new_path = move(path, ("Tutorien","Tutorium 03 - Philipp Faller"), ("Tutorium","Philipp"))
if new_path is not None:
if new_path.name == "tut2.pdf":
return rename(new_path, "Tut2.pdf")
return new_path
# Alles Tutoriengedöns von Sebastian in Tutorium/Sebastian/*
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 1"), ("Tutorium","Sebastian", "tut01"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 2", "aufgaben.pdf"), ("Tutorium","Sebastian", "tut02.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 3", "aufgaben.pdf"), ("Tutorium","Sebastian", "tut03.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 4", "aufgaben.pdf"), ("Tutorium","Sebastian", "tut04.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 5", "aufgaben.pdf"), ("Tutorium","Sebastian", "tut05.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 6", "aufgaben.pdf"), ("Tutorium","Sebastian", "tut06.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 7", "tut7.pdf"), ("Tutorium","Sebastian", "tut07.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 8", "tut8.pdf"), ("Tutorium","Sebastian", "tut08.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 9", "tut9.pdf"), ("Tutorium","Sebastian", "tut09.pdf"))
if new_path is not None: return new_path
if path.parts == ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 10", "tut10.pdf"): return None
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller"), ("Tutorium","Sebastian"))
if new_path is not None:
return new_path
# Übungs-Gedöns in Übung/*
new_path = move(path, ("Informatikervorlesung", "Übungsfolien"), ("Übung",))
if new_path is not None:
if new_path.name == "Übung_06_ausgewählte Folien.pdf":
return rename(new_path, "Übung_06_ausgewählte_Folien.pdf")
return new_path
# Vorlesungsfolien-Gedöns in Folien/*
new_path = move(path, ("Informatikervorlesung", "Folien.Notizen"), ("Folien",))
if new_path is not None:
return new_path
# Rest in Hauptverzeichnis
new_path = move(path, ("Informatikervorlesung",), ())
if new_path is not None:
# Rename filenames that are invalid on FAT systems
if new_path.name == "Evaluationsergebnisse: Übung.pdf":
return rename(new_path, "Evaluationsergebnisse_Übung.pdf")
if new_path.name == "Skript \"Lineare Algebra\" von Stefan Kühnlein.pdf":
return rename(new_path, "Skript Lineare Algebra von Stefan kühnlein.pdf")
return new_path
return path
def prog_filter(path):
# Tutorien rausfiltern
if path.parts[:1] == ("Tutorien",): return False
return True
def prog_transform(path):
# Übungsblätter in Blätter/*
new_path = move(path, ("Übungen",), ("Blätter",))
if new_path is not None:
if new_path.name == "assignmen04.pdf":
return rename(new_path, "assignment04.pdf")
return new_path
# Folien in Folien/*
new_path = move(path, ("Vorlesungsmaterial",), ("Folien",))
if new_path is not None:
if new_path.name == "00.1_Begruessung.pdf":
return rename(new_path, "00-01_Begruessung.pdf")
if new_path.name == "00.2_Organisatorisches.pdf":
return rename(new_path, "00-02_Organisatorisches.pdf")
if new_path.name == "01-01_ Einfache-Programme.pdf":
return rename(new_path, "01-01_Einfache_Programme.pdf")
if new_path.name == "13_Finden_und_ Beheben_von_Fehlern.pdf":
return rename(new_path, "13_Finden_und_Beheben_von_Fehlern.pdf")
return new_path
return path
# Semester 2
def algo1_filter(path):
# Tutorien rausfiltern
if path.parts[:1] == ("Tutorien",):
if path.parts[1:] == (): return True
#if path.parts[1:2] == ("Tutorium 15",): return True
return False
return True
def algo1_transform(path):
# Folien in Folien/*
new_path = move(path, ("Vorlesungsfolien",), ("Folien",))
if new_path is not None:
return new_path
return path
def hm2_transform(path):
match = re.match(r"blatt(\d+).pdf", path.name)
if match:
new_path = move(path, (), ("Blätter",))
number = int(match.group(1))
return rename(new_path, f"blatt_{number:02}.pdf")
match = re.match(r"blatt(\d+).loesungen.pdf", path.name)
if match:
new_path = move(path, (), ("Blätter",))
number = int(match.group(1))
return rename(new_path, f"loesung_{number:02}.pdf")
return path
def la2_filter(path):
# Tutorien rausfiltern
if path.parts[:1] == ("Tutorien",):
if path.parts[1:] == (): return True
#if path.parts[1:2] == ("Tutorium 15",): return True
return False
return True
def la2_transform(path):
# Folien in Folien/*
new_path = move(path, ("Vorlesungsmaterial",), ("Folien",))
if new_path is not None: return new_path
# Alle Übungsblätter in Blätter/blatt_xx.pdf
# Alles andere Übungsmaterial in Blätter/*
new_path = move(path, ("Übungen",), ("Blätter",))
if new_path is not None:
match = re.match(r"Blatt(\d+).pdf", new_path.name)
if match:
number = int(match.group(1))
return rename(new_path, f"blatt_{number:02}.pdf")
return new_path
return path
def swt1_filter(path):
# Tutorien rausfiltern
if path.parts[:1] == ("Tutorien",):
if path.parts[1:] == (): return True
#if path.parts[1:2] == ("Tutorium 15",): return True
return False
return True
def swt1_transform(path):
# Folien in Folien/*
new_path = move(path, ("Vorlesungsmaterial",), ("Folien",))
if new_path is not None: return new_path
# Übungsblätter in Blätter/*
new_path = move(path, ("Übungen",), ("Blätter",))
if new_path is not None: return new_path
return path
# Main part of the config
def main(args):
args = [arg.lower() for arg in args]
ffm = PFERD.FfM(base_dir)
ilias = PFERD.Ilias(base_dir, "cookie_jar")
norbert = PFERD.Norbert(base_dir)
# Semester 1
if not args or "gbi" in args:
ilias.synchronize("855240", "GBI",
transform=gbi_transform, filter=gbi_filter)
if not args or "hm1" in args:
ffm.synchronize("iana2/lehre/hm1info2018w", "HM1",
transform=hm1_transform)
if not args or "la1" in args:
ilias.synchronize("874938", "LA1",
transform=la1_transform, filter=la1_filter)
if not args or "prog" in args:
ilias.synchronize("851237", "Prog",
transform=prog_transform, filter=prog_filter)
if not args or "norbert" in args:
norbert.synchronize("Prog-Tut")
# Semester 2
if not args or "algo1" in args:
ilias.synchronize("959260", "Algo1",
transform=algo1_transform, filter=algo1_filter)
if not args or "hm2" in args:
ffm.synchronize("iana2/lehre/hm2info2019s", "HM2",
transform=hm2_transform)
if not args or "la2" in args:
ilias.synchronize("950588", "LA2",
transform=la2_transform, filter=la2_filter)
if not args or "swt1" in args:
ilias.synchronize("945596", "SWT1",
transform=swt1_transform, filter=swt1_filter)
tf_ss_2020_or1 = attempt(
move_dir("Vorlesung/Unbeschriebene Folien/", "Vorlesung/Folien/"),
move_dir("Video zur Organisation/", "Vorlesung/Videos/"),
keep,
)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--test-run", action="store_true")
parser.add_argument("synchronizers", nargs="*")
args = parser.parse_args()
pferd = Pferd(Path(__file__).parent, test_run=args.test_run)
pferd.enable_logging()
if not args.synchronizers or "numerik" in args.synchronizers:
pferd.ilias_kit(
target="Numerik",
course_id="1083036",
transform=tf_ss_2020_numerik,
cookies="ilias_cookies.txt",
)
if not args.synchronizers or "db" in args.synchronizers:
pferd.ilias_kit(
target="DB",
course_id="1101554",
transform=tf_ss_2020_db,
cookies="ilias_cookies.txt",
)
if not args.synchronizers or "rechnernetze" in args.synchronizers:
pferd.ilias_kit(
target="Rechnernetze",
course_id="1099996",
transform=tf_ss_2020_rechnernetze,
cookies="ilias_cookies.txt",
)
if not args.synchronizers or "sicherheit" in args.synchronizers:
pferd.ilias_kit(
target="Sicherheit",
course_id="1101980",
transform=tf_ss_2020_sicherheit,
cookies="ilias_cookies.txt",
)
if not args.synchronizers or "pg" in args.synchronizers:
pferd.ilias_kit(
target="PG",
course_id="1106095",
transform=tf_ss_2020_pg,
cookies="ilias_cookies.txt",
)
if not args.synchronizers or "or1" in args.synchronizers:
pferd.ilias_kit(
target="OR1",
course_id="1105941",
dir_filter=df_ss_2020_or1,
transform=tf_ss_2020_or1,
cookies="ilias_cookies.txt",
)
# Prints a summary listing all new, modified or deleted files
pferd.print_summary()
if __name__ == "__main__":
args = sys.argv[1:]
main(args)
main()

View File

@ -0,0 +1,38 @@
"""
This is a small config that just crawls the ILIAS Personal Desktop.
It does not filter or rename anything, it just gobbles up everything it can find.
Note that this still includes a test-run switch, so you can see what it *would* download.
You can enable that with the "--test-run" command line switch,
i. e. "python3 example_config_minimal.py --test-run".
"""
import argparse
from pathlib import Path
from PFERD import Pferd
def main() -> None:
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument("--test-run", action="store_true")
args = parser.parse_args()
# Create the Pferd helper instance
pferd = Pferd(Path(__file__).parent, test_run=args.test_run)
pferd.enable_logging()
# Synchronize the personal desktop into the "ILIAS" directory.
# It saves the cookies, so you only need to log in again when the ILIAS cookies expire.
pferd.ilias_kit_personal_desktop(
"ILIAS",
cookies="ilias_cookies.txt",
)
# Prints a summary listing all new, modified or deleted files
pferd.print_summary()
if __name__ == "__main__":
main()

7
mypy.ini Normal file
View File

@ -0,0 +1,7 @@
[mypy]
disallow_untyped_defs = True
disallow_incomplete_defs = True
no_implicit_optional = True
[mypy-rich.*,bs4]
ignore_missing_imports = True

View File

@ -1,13 +1,13 @@
from setuptools import setup
from setuptools import find_packages, setup
setup(
name="PFERD",
version="1.1.3",
packages=["PFERD"],
version="2.2.0",
packages=find_packages(),
install_requires=[
"requests>=2.21.0",
"beautifulsoup4>=4.7.1",
"colorama>=0.4.1"
"rich>=2.1.0"
],
)