Compare commits

...

134 Commits

Author SHA1 Message Date
f614b95a00 Adjust version in setup.py 2020-05-30 19:07:02 +02:00
8198c9ecaa Reorder methods a bit 2020-05-30 19:06:36 +02:00
086b15d10f Crawl a bit more iteratively 2020-05-30 15:47:15 +02:00
9d6ce331a5 Use IliasCrawlerEntry entries in the ilias scraper 2020-05-30 15:20:51 +02:00
821c7ade26 Move video url extraction logic to crawler 2020-05-30 00:22:31 +02:00
b969a1854a Remove unneeded whitespace 2020-05-30 00:22:31 +02:00
62535b4452 Unpack videos in ILIAS downloader 2020-05-21 22:12:52 +02:00
c0056e5669 Correctly crawl video pages with multiple pages 2020-05-21 21:38:07 +02:00
cfe4a8fc0a Bump version to 2.0.0 2020-05-15 11:26:23 +00:00
95b9248a25 Clean up 2020-05-15 11:26:09 +00:00
1004fa40f8 Add personal desktop example config to README 2020-05-15 11:02:55 +02:00
e8ddb0ca04 Fix example config link in README 2020-05-15 11:02:45 +02:00
36c8785f15 Add example config that synchronizes the personal desktop 2020-05-15 11:02:13 +02:00
03a801eecc Correctly type hint swallow_and_print_errors decorator 2020-05-12 21:03:53 +02:00
072c6630bf Avoid logging import in config 2020-05-12 18:19:23 +00:00
4f56c8f192 Pass element type to ilias directory filter 2020-05-12 14:41:13 +02:00
4fdb67128d Fetch correct diva playlist id 2020-05-11 00:25:34 +02:00
a0f9d31d94 Use PrettyLogger warning everywhere 2020-05-10 21:56:12 +02:00
e7b08420ba Warn when a marked file is added again 2020-05-10 21:42:30 +02:00
c1b21f7772 Only remove a progress task when we added it 2020-05-10 12:28:30 +02:00
9850ab1d73 Allow crawling the ILIAS Personal Desktop 2020-05-10 12:16:42 +02:00
9950144e97 Allow passing a playlist URL to diva instead of an id 2020-05-10 11:17:13 +02:00
f6faacabb0 Move FatalException to errors.py 2020-05-09 00:11:21 +02:00
19c1e3ac6f Fail on invalid ILIAS course ids 2020-05-09 00:11:20 +02:00
afa48c2d2d Swallow and print errors instead of crashing 2020-05-09 00:10:54 +02:00
a4c518bf4c Update date find regex 2020-05-08 22:17:58 +02:00
057135022f Try to accept that life sometimes is in English 2020-05-08 22:10:43 +02:00
755e9aa0d3 Try to add support for Shibboleth TFA token 2020-05-08 21:52:51 +02:00
c9deca19ca Remove walrus to lower needed python version 2020-05-08 21:21:33 +02:00
bb048c3a6d Apparently we want Python 3.8 2020-05-08 21:04:13 +02:00
33fcd307b2 Adjust install directions 2020-05-08 20:53:41 +02:00
a0c5572b59 Fix progress bars swallowing a line when they shouldn't 2020-05-08 19:55:53 +02:00
2d20d2934c Color warning differently 2020-05-08 19:52:45 +02:00
2c48ab66d4 Use rich for log colorization 2020-05-08 19:31:54 +02:00
104b838aed Automatically discover packages in setup.py 2020-05-08 18:58:44 +02:00
7f10931be8 Add rich to setup.py 2020-05-08 18:49:36 +02:00
07c225bc20 Expand README. I did not proofread this thing :( 2020-05-08 18:47:58 +02:00
56f2394001 Add a download progress bar 2020-05-08 17:09:56 +02:00
fdff8bc40e example_config: Change db transform 2020-05-01 13:31:29 +02:00
bee3d70998 Added a diva playlist downloader 2020-04-30 17:18:45 +02:00
42345ecc61 Demangle "Morgen" too 2020-04-30 12:05:25 +02:00
920d521d68 Change PrettyLogger.warn to PrettyLogger.warning 2020-04-25 20:11:51 +02:00
e0b46a306a Use warn method in IliasCrawler 2020-04-25 20:07:40 +02:00
8a42a2a396 Move logging into its own file 2020-04-25 20:02:01 +02:00
80247400a4 Debug log when starting an ilias download 2020-04-25 13:02:07 +02:00
13c5a29ff0 Fix and extend example config 2020-04-24 18:41:22 +00:00
1aaa6e7ab5 Use PathLike everywhere 2020-04-24 18:41:14 +00:00
7f53543324 Satisfy pylint and add todo 2020-04-24 18:26:28 +00:00
292e516297 Change crawler and downloader output 2020-04-24 18:24:44 +00:00
8258fa8919 Add test run option to PFERD 2020-04-24 18:00:21 +00:00
5b929f09a2 Move download strategies to downloader
Also fixes an issue where the downloader didn't mark files that were not
downloaded due to the strategy used.
2020-04-24 14:27:40 +00:00
4d32f863bc Clean up organizer after synchronizing 2020-04-24 14:17:23 +00:00
4e7333b396 Allow specifying paths as strings in Pferd 2020-04-24 11:50:40 +00:00
4c0e3b493a Use download_modified_or_new as default strategy 2020-04-24 13:48:06 +02:00
2de079a5d3 Add a few Transform combinators 2020-04-24 11:35:46 +00:00
509e624d47 Satisfy pyling. Useful docstrings? Not quite sure. 2020-04-23 20:35:59 +02:00
ca8fcf7a1d Somewhat elaborate example_config 2020-04-23 20:22:41 +02:00
980f69b5af Fix organizer marking itself causing an error 2020-04-23 20:02:05 +02:00
0b00a9c26b Log when starting to synchronize 2020-04-23 19:56:37 +02:00
1ef85c45e5 Switch Transform to PurePath 2020-04-23 17:40:43 +00:00
5ef5a56e69 Extract Location into separate file 2020-04-23 17:38:28 +00:00
f3f4be2690 More free functions 2020-04-23 19:21:49 +02:00
076b8c5a1f Add download strategies to save bandwith
Only download files that are newer than the local version.
2020-04-23 18:29:20 +02:00
13bc78c889 Display reason for ignoring an element in ilias crawler 2020-04-23 13:54:58 +02:00
dc964a9d98 Remove finished TODOs 2020-04-23 13:30:34 +02:00
c2b14f3db9 ilias crawler: Use direct download link if possible 2020-04-23 13:08:12 +02:00
4b59a7c375 Move around TODOs 2020-04-23 10:49:01 +00:00
3a57430893 Fix type errors in example_config 2020-04-23 12:35:58 +02:00
bef210ae77 Rename and implement IliasDirectoryFilter 2020-04-23 12:35:18 +02:00
ea005517cf Only remove folders if they exist in tmpdir 2020-04-23 12:09:45 +02:00
3841f27aab Add example config 2020-04-23 09:50:32 +00:00
df0eb84a44 Fix TmpDir and Location
TmpDir: Clean up before and after, not just after
Location: Resolve path so that parent check works properly
2020-04-23 09:50:32 +00:00
2de4255a78 Add Pferd class 2020-04-23 09:50:32 +00:00
3c808879c9 Add Transforms and Transformables 2020-04-22 18:25:09 +00:00
a051e3bcca ilias crawler: Add some unhelpful documentation 2020-04-22 17:58:19 +02:00
eb7df036df WIP: ilias crawler: Also crawl assignments 2020-04-22 14:32:20 +02:00
23db59e733 WIP: ilias-crawler: Demangle dates 2020-04-22 12:58:44 +02:00
ac65b06a8e Satisfy pylint a bit 2020-04-22 01:37:34 +02:00
8891041069 WIP: crawler: Add opencast video crawler 2020-04-21 23:01:19 +02:00
70d63e3e90 WIP: Start small ILIAS crawler 2020-04-21 13:32:03 +02:00
b2a7af2e3e Store modification_date in IliasDownloadInfo, remove parameters 2020-04-21 13:31:50 +02:00
23bed48c8c Satisfy autopep8 2020-04-21 13:30:42 +02:00
0926d33798 Use downloader-specific data classes 2020-04-20 18:07:45 +00:00
55ba2f4070 Fix pylint in downloaders 2020-04-20 19:49:15 +02:00
d18b48aaf4 Stream in http downloader 2020-04-20 19:45:25 +02:00
4ef0ffe3bf Listen to pylint and mypy 2020-04-20 17:44:58 +00:00
ce77995c8f Rename http downloader module 2020-04-20 17:08:51 +00:00
ed9245c14d Remove old organizer 2020-04-20 18:50:23 +02:00
01e6972c96 Add ilias downloader 2020-04-20 18:49:01 +02:00
8181ae5b17 Guard http response in context manager 2020-04-20 18:47:46 +02:00
6407190ae0 Soupify requests responses properly 2020-04-20 16:38:30 +00:00
87395faac2 Add base for simple HTTP downloader 2020-04-20 17:43:59 +02:00
a9e6e7883d Create temp dir folder in constructor 2020-04-20 17:43:59 +02:00
154d6b29dd Listen to pylint 2020-04-20 15:16:22 +00:00
62ac569ec4 Revert "Add proposed crawler entry type"
This reverts commit 9f1a0a58ab.

Each crawler will have its own data class.
2020-04-20 16:59:20 +02:00
9f1a0a58ab Add proposed crawler entry type 2020-04-20 16:54:47 +02:00
879a2c7c80 Rewrite ILIAS authenticator 2020-04-20 14:26:30 +00:00
ff06c5215e Fix authenticator 2020-04-20 14:26:29 +00:00
135a8dce4b Fix resolve_path allowing paths outside its folder
This happened if the directory name was a prefix of the offending file name.
2020-04-20 16:07:14 +02:00
63bbcad918 Add resolve method to tmp_dir 2020-04-20 15:40:07 +02:00
6584d6a905 Elaborate accept_file in new_organizer 2020-04-20 15:40:07 +02:00
5990098ef8 Add UserPassAuthenticator 2020-04-20 13:26:45 +00:00
f3d3d6bb65 Add some docs to cookie_jar 2020-04-20 14:38:03 +02:00
b2fe7cc064 Add preliminary logging to organizer and tmp_dir 2020-04-20 14:37:44 +02:00
930d821dd7 Add a simple organizer 2020-04-20 14:29:48 +02:00
5c2ff14839 Add "prompt_yes_no" to utils 2020-04-20 14:29:48 +02:00
a3d6dc7873 Clean up temp_folder 2020-04-20 14:29:48 +02:00
53ad1c924b Add cookie jar 2020-04-20 11:35:26 +00:00
8c431c7d81 Add a simple temporary folder 2020-04-20 12:08:52 +02:00
d5dd5aac06 Fix some mypy errors 2020-04-20 01:54:47 +00:00
7d48972967 Configure mypy 2020-04-19 19:50:17 +00:00
25043a4aaa Remove unnecessary files
Also document some plans for the new program structure in REWRITE.md
2020-04-19 19:49:43 +00:00
7ebeef5873 Clean up gitignore 2020-04-19 18:47:44 +00:00
9b658776ca Merge pull request #6 from I-Al-Istannen/master
Hack in support for TI exams
2020-03-01 23:09:32 +00:00
cf3553175f Add OS_Exams synchronizer 2020-02-27 14:51:29 +01:00
bf8b3cf9f7 Hack in support for TI exams
This just adds an additional crawl check for AlteKlausuren. This is not
present on the root site but at the suffix `/Klausuren`.
Example config:

```py
 # The "Klausur" needs to be copied verbatim!
ti.synchronize("Klausur", "sync dir name",
               transform=ro_19_klausur_transform, filter=ro_19_klausur_filter)
```
2020-02-24 20:58:27 +01:00
4a5600d5ce Merge pull request #5 from I-Al-Istannen/master
Lose 50 minutes of my life (and fix the TGI tut)
2019-12-12 11:52:05 +00:00
f5bc49160f Lose 50 minutes of my life (and fix the TGI tut) 2019-12-12 12:50:16 +01:00
fa5f82d312 Merge pull request #4 from I-Al-Istannen/master
[TGI] Add TGi tut
2019-11-18 22:12:32 +00:00
4433696509 [TGI] Add TGi tut 2019-11-18 09:58:16 +01:00
1f5475abc5 Merge pull request #3 from I-Al-Istannen/master
Download all TGI files and not just lectures
2019-10-17 21:10:21 +00:00
1407c6d264 Download all TGI files and not just lectures 2019-10-17 22:14:32 +02:00
e152bfc4a3 Merge pull request #2 from I-Al-Istannen/master
Add support for TGI
2019-10-15 20:01:10 +00:00
1973c931bd Add support for other years in TGI downloader 2019-10-15 15:37:52 +02:00
458cc1c6d6 Add support for TGI website 2019-10-15 15:34:59 +02:00
52852d11a6 Bump version to 1.1.8 2019-09-22 11:56:41 +00:00
f94629a7fa Fix exceptions with weird content types
(hopefully)
2019-09-22 11:55:47 +00:00
c8ee456d33 Bump version to 1.1.7 2019-07-26 08:14:55 +00:00
2752e98621 Fix relative url joining in ti downloader 2019-07-26 10:06:01 +02:00
1572e11da8 Bump version to 1.1.6 2019-07-05 08:49:26 +00:00
ea01dc7cb2 Allow even more types of files 2019-07-05 08:48:43 +00:00
aba8d46d26 Bump version to 1.1.5 2019-07-04 12:17:33 +00:00
77056e6f8d Allow more types of files 2019-07-04 12:16:42 +00:00
064f12c14c Ignore mypy files 2019-07-04 12:16:26 +00:00
30 changed files with 2678 additions and 1128 deletions

15
.gitignore vendored
View File

@ -1,12 +1,7 @@
# python stuff
__pycache__/
# venv stuff
bin/
include/
lib/
lib64
pyvenv.cfg
.venv/
.mypy_cache/
.tmp/
pip-selfcheck.json
.env
.vscode
ilias_cookies.txt

View File

@ -1,37 +1,8 @@
import logging
# pylint: disable=invalid-name
from .ffm import *
from .ilias import *
from .norbert import *
from .ti import *
from .utils import *
"""
This module exports only what you need for a basic configuration. If you want a
more complex configuration, you need to import the other submodules manually.
"""
__all__ = ["STYLE", "FORMAT", "DATE_FORMAT", "FORMATTER", "enable_logging"]
__all__ += ffm.__all__
__all__ += ilias.__all__
__all__ += norbert.__all__
__all__ += ti.__all__
__all__ += utils.__all__
STYLE = "{"
FORMAT = "[{levelname:<7}] {message}"
DATE_FORMAT = "%F %T"
FORMATTER = logging.Formatter(
fmt=FORMAT,
datefmt=DATE_FORMAT,
style=STYLE,
)
def enable_logging(name="PFERD", level=logging.INFO):
handler = logging.StreamHandler()
handler.setFormatter(FORMATTER)
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(handler)
# This should be logged by our own handler, and not the root logger's
# default handler, so we don't pass it on to the root logger.
logger.propagate = False
from .pferd import Pferd

125
PFERD/authenticators.py Normal file
View File

@ -0,0 +1,125 @@
"""
General authenticators useful in many situations
"""
import getpass
from typing import Optional, Tuple
class TfaAuthenticator:
# pylint: disable=too-few-public-methods
"""
An authenticator for a TFA token. Always prompts the user, as the token can not be cached.
"""
def __init__(self, reason: str):
"""
Create a new tfa authenticator.
Arguments:
reason {str} -- the reason for obtaining the credentials
"""
self._reason = reason
def get_token(self) -> str:
# pylint: disable=no-self-use
"""
Prompts the user for the token and returns it.
"""
print(f"Enter credentials ({self._reason})")
return getpass.getpass("TFA Token: ")
class UserPassAuthenticator:
"""
An authenticator for username-password combinations that prompts the user
for missing information.
"""
def __init__(
self,
reason: str,
username: Optional[str] = None,
password: Optional[str] = None,
) -> None:
"""
reason - what the credentials are used for
username - the username (if already known)
password - the password (if already known)
"""
self._reason = reason
self._given_username = username
self._given_password = password
self._username = username
self._password = password
def get_credentials(self) -> Tuple[str, str]:
"""
Returns a tuple (username, password). Prompts user for username or
password when necessary.
"""
if self._username is None and self._given_username is not None:
self._username = self._given_username
if self._password is None and self._given_password is not None:
self._password = self._given_password
if self._username is None or self._password is None:
print(f"Enter credentials ({self._reason})")
username: str
if self._username is None:
username = input("Username: ")
self._username = username
else:
username = self._username
password: str
if self._password is None:
password = getpass.getpass(prompt="Password: ")
self._password = password
else:
password = self._password
return (username, password)
@property
def username(self) -> str:
"""
The username. Accessing this property may cause the authenticator to
prompt the user.
"""
(username, _) = self.get_credentials()
return username
@property
def password(self) -> str:
"""
The password. Accessing this property may cause the authenticator to
prompt the user.
"""
(_, password) = self.get_credentials()
return password
def invalidate_credentials(self) -> None:
"""
Marks the credentials as invalid. If only a username was supplied in
the constructor, assumes that the username is valid and only the
password is invalid. If only a password was supplied in the
constructor, assumes that the password is valid and only the username
is invalid. Otherwise, assumes that username and password are both
invalid.
"""
self._username = None
self._password = None
if self._given_username is not None and self._given_password is not None:
self._given_username = None
self._given_password = None

69
PFERD/cookie_jar.py Normal file
View File

@ -0,0 +1,69 @@
"""A helper for requests cookies."""
import logging
from http.cookiejar import LoadError, LWPCookieJar
from pathlib import Path
from typing import Optional
import requests
LOGGER = logging.getLogger(__name__)
class CookieJar:
"""A cookie jar that can be persisted."""
def __init__(self, cookie_file: Optional[Path] = None) -> None:
"""Create a new cookie jar at the given path.
If the path is None, the cookies will not be persisted.
"""
self._cookies: LWPCookieJar
if cookie_file is None:
self._cookies = LWPCookieJar()
else:
self._cookies = LWPCookieJar(cookie_file)
@property
def cookies(self) -> LWPCookieJar:
"""Return the requests cookie jar."""
return self._cookies
def load_cookies(self) -> None:
"""Load all cookies from the file given in the constructor."""
if self._cookies.filename is None:
return
try:
LOGGER.info("Loading old cookies from %s", self._cookies.filename)
self._cookies.load(ignore_discard=True)
except (FileNotFoundError, LoadError):
LOGGER.warning(
"No valid cookie file found at %s, continuing with no cookies",
self._cookies.filename
)
def save_cookies(self, reason: Optional[str] = None) -> None:
"""Save the cookies in the file given in the constructor."""
if self._cookies.filename is None:
return
if reason is None:
LOGGER.info("Saving cookies")
else:
LOGGER.info("Saving cookies (%s)", reason)
# TODO figure out why ignore_discard is set
# TODO possibly catch a few more exceptions
self._cookies.save(ignore_discard=True)
def create_session(self) -> requests.Session:
"""Create a new session using the cookie jar."""
sess = requests.Session()
# From the request docs: "All requests code should work out of the box
# with externally provided instances of CookieJar, e.g. LWPCookieJar
# and FileCookieJar."
sess.cookies = self.cookies # type: ignore
return sess

169
PFERD/diva.py Normal file
View File

@ -0,0 +1,169 @@
"""
Utility functions and a scraper/downloader for the KIT DIVA portal.
"""
import logging
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, List, Optional
import requests
from .errors import FatalException
from .logging import PrettyLogger
from .organizer import Organizer
from .tmp_dir import TmpDir
from .transform import Transformable
from .utils import stream_to_path
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
@dataclass
class DivaDownloadInfo(Transformable):
"""
Information about a DIVA video
"""
url: str
DivaDownloadStrategy = Callable[[Organizer, DivaDownloadInfo], bool]
def diva_download_new(organizer: Organizer, info: DivaDownloadInfo) -> bool:
"""
Accepts only new files.
"""
resolved_file = organizer.resolve(info.path)
if not resolved_file.exists():
return True
PRETTY.ignored_file(info.path, "local file exists")
return False
class DivaPlaylistCrawler:
# pylint: disable=too-few-public-methods
"""
A crawler for DIVA playlists.
"""
_PLAYLIST_BASE_URL = "https://mediaservice.bibliothek.kit.edu/asset/detail/"
_COLLECTION_BASE_URL = "https://mediaservice.bibliothek.kit.edu/asset/collection.json"
def __init__(self, playlist_id: str):
self._id = playlist_id
@classmethod
def fetch_id(cls, playlist_link: str) -> str:
"""
Fetches the ID for a playerlist, given the base link
(e.g. https://mediaservice.bibliothek.kit.edu/#/details/DIVA-2019-271).
Raises a FatalException, if the id can not be resolved
"""
match = re.match(r".+#/details/(.+)", playlist_link)
if match is None:
raise FatalException(
"DIVA: Invalid playlist link format, could not extract details."
)
base_name = match.group(1)
response = requests.get(cls._PLAYLIST_BASE_URL + base_name + ".json")
if response.status_code != 200:
raise FatalException(
f"DIVA: Got non-200 status code ({response.status_code}))"
f"when requesting {response.url!r}!"
)
body = response.json()
if body["error"]:
raise FatalException(f"DIVA: Server returned error {body['error']!r}.")
return body["result"]["collection"]["id"]
def crawl(self) -> List[DivaDownloadInfo]:
"""
Crawls the playlist given in the constructor.
"""
response = requests.get(self._COLLECTION_BASE_URL, params={"collection": self._id})
if response.status_code != 200:
raise FatalException(f"Server returned status {response.status_code}.")
body = response.json()
if body["error"]:
raise FatalException(f"Server returned error {body['error']!r}.")
result = body["result"]
if result["resultCount"] > result["pageSize"]:
PRETTY.warning("Did not receive all results, some will be missing")
download_infos: List[DivaDownloadInfo] = []
for video in result["resultList"]:
title = video["title"]
collection_title = self._follow_path(["collection", "title"], video)
url = self._follow_path(
["resourceList", "derivateList", "mp4", "url"],
video
)
if url and collection_title and title:
path = Path(collection_title, title + ".mp4")
download_infos.append(DivaDownloadInfo(path, url))
else:
PRETTY.warning(f"Incomplete video found: {title!r} {collection_title!r} {url!r}")
return download_infos
@staticmethod
def _follow_path(path: List[str], obj: Any) -> Optional[Any]:
"""
Follows a property path through an object, bailing at the first None.
"""
current = obj
for path_step in path:
if path_step in current:
current = current[path_step]
else:
return None
return current
class DivaDownloader:
"""
A downloader for DIVA videos.
"""
def __init__(self, tmp_dir: TmpDir, organizer: Organizer, strategy: DivaDownloadStrategy):
self._tmp_dir = tmp_dir
self._organizer = organizer
self._strategy = strategy
self._session = requests.session()
def download_all(self, infos: List[DivaDownloadInfo]) -> None:
"""
Download multiple files one after the other.
"""
for info in infos:
self.download(info)
def download(self, info: DivaDownloadInfo) -> None:
"""
Download a single file.
"""
if not self._strategy(self._organizer, info):
self._organizer.mark(info.path)
return
with self._session.get(info.url, stream=True) as response:
if response.status_code == 200:
tmp_file = self._tmp_dir.new_path()
stream_to_path(response, tmp_file, info.path.name)
self._organizer.accept_file(tmp_file, info.path)
else:
PRETTY.warning(f"Could not download file, got response {response.status_code}")

72
PFERD/downloaders.py Normal file
View File

@ -0,0 +1,72 @@
"""
General downloaders useful in many situations
"""
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import requests
import requests.auth
from .organizer import Organizer
from .tmp_dir import TmpDir
from .transform import Transformable
from .utils import stream_to_path
@dataclass
class HttpDownloadInfo(Transformable):
"""
This class describes a single file to be downloaded.
"""
url: str
parameters: Dict[str, Any] = field(default_factory=dict)
class HttpDownloader:
"""A HTTP downloader that can handle HTTP basic auth."""
def __init__(
self,
tmp_dir: TmpDir,
organizer: Organizer,
username: Optional[str],
password: Optional[str],
):
"""Create a new http downloader."""
self._organizer = organizer
self._tmp_dir = tmp_dir
self._username = username
self._password = password
self._session = self._build_session()
def _build_session(self) -> requests.Session:
session = requests.Session()
if self._username and self._password:
session.auth = requests.auth.HTTPBasicAuth(
self._username, self._password
)
return session
def download_all(self, infos: List[HttpDownloadInfo]) -> None:
"""
Download multiple files one after the other.
"""
for info in infos:
self.download(info)
def download(self, info: HttpDownloadInfo) -> None:
"""
Download a single file.
"""
with self._session.get(info.url, params=info.parameters, stream=True) as response:
if response.status_code == 200:
tmp_file = self._tmp_dir.new_path()
stream_to_path(response, tmp_file, info.path.name)
self._organizer.accept_file(tmp_file, info.path)
else:
# TODO use proper exception
raise Exception(f"Could not download file, got response {response.status_code}")

39
PFERD/errors.py Normal file
View File

@ -0,0 +1,39 @@
"""
An error logging decorator.
"""
import logging
from typing import Any, Callable, TypeVar, cast
from rich.console import Console
from .logging import PrettyLogger
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class FatalException(Exception):
"""
A fatal exception occurred. Recovery is not possible.
"""
TFun = TypeVar('TFun', bound=Callable[..., Any])
def swallow_and_print_errors(function: TFun) -> TFun:
"""
Decorates a function, swallows all errors, logs them and returns none if one occurred.
"""
def inner(*args: Any, **kwargs: Any) -> Any:
# pylint: disable=broad-except
try:
return function(*args, **kwargs)
except FatalException as error:
PRETTY.error(str(error))
return None
except Exception as error:
Console().print_exception()
return None
return cast(TFun, inner)

View File

@ -1,61 +0,0 @@
# Fakultät für Mathematik (FfM)
import logging
import pathlib
import re
import bs4
import requests
from .organizer import Organizer
from .utils import stream_to_path, PrettyLogger
__all__ = ["FfM"]
logger = logging.getLogger(__name__)
pretty = PrettyLogger(logger)
class FfM:
BASE_URL = "http://www.math.kit.edu/"
LINK_RE = re.compile(r"^https?://www.math.kit.edu/.*/(.*\.pdf)$")
def __init__(self, base_path):
self.base_path = base_path
self._session = requests.Session()
def synchronize(self, urlpart, to_dir, transform=lambda x: x):
pretty.starting_synchronizer(to_dir, "FfM", urlpart)
sync_path = pathlib.Path(self.base_path, to_dir)
orga = Organizer(self.base_path, sync_path)
orga.clean_temp_dir()
self._crawl(orga, urlpart, transform)
orga.clean_sync_dir()
orga.clean_temp_dir()
def _crawl(self, orga, urlpart, transform):
url = self.BASE_URL + urlpart
r = self._session.get(url)
soup = bs4.BeautifulSoup(r.text, "html.parser")
for found in soup.find_all("a", href=self.LINK_RE):
url = found["href"]
filename = re.match(self.LINK_RE, url).group(1).replace("/", ".")
logger.debug(f"Found file {filename} at {url}")
old_path = pathlib.PurePath(filename)
new_path = transform(old_path)
if new_path is None:
continue
logger.debug(f"Transformed from {old_path} to {new_path}")
temp_path = orga.temp_file()
self._download(url, temp_path)
orga.add_file(temp_path, new_path)
def _download(self, url, to_path):
with self._session.get(url, stream=True) as r:
stream_to_path(r, to_path)

View File

@ -1,109 +0,0 @@
# ILIAS
import logging
import pathlib
import re
from .ilias_authenticators import ShibbolethAuthenticator
from .organizer import Organizer
from .utils import PrettyLogger
__all__ = ["Ilias"]
logger = logging.getLogger(__name__)
pretty = PrettyLogger(logger)
class Ilias:
FILE_RE = re.compile(r"goto\.php\?target=(file_\d+_download)")
DIR_RE = re.compile(r"ilias\.php\?ref_id=(\d+)")
def __init__(self, base_path, cookie_file):
self.base_path = base_path
self._auth = ShibbolethAuthenticator(base_path / cookie_file)
def synchronize(self, ref_id, to_dir, transform=lambda x: x, filter=lambda x: True):
pretty.starting_synchronizer(to_dir, "ILIAS", f"ref_id {ref_id}")
sync_path = pathlib.Path(self.base_path, to_dir)
orga = Organizer(self.base_path, sync_path)
orga.clean_temp_dir()
files = self._crawl(pathlib.PurePath(), f"fold_{ref_id}", filter)
self._download(orga, files, transform)
orga.clean_sync_dir()
orga.clean_temp_dir()
def _crawl(self, dir_path, dir_id, filter_):
soup = self._auth.get_webpage(dir_id)
found_files = []
files = self._find_files(soup)
for (name, file_id) in files:
path = dir_path / name
found_files.append((path, file_id))
logger.debug(f"Found file {path}")
dirs = self._find_dirs(soup)
for (name, ref_id) in dirs:
path = dir_path / name
logger.debug(f"Found dir {path}")
if filter_(path):
logger.info(f"Searching {path}")
files = self._crawl(path, ref_id, filter_)
found_files.extend(files)
else:
logger.info(f"Not searching {path}")
return found_files
def _download(self, orga, files, transform):
for (path, file_id) in sorted(files):
to_path = transform(path)
if to_path is not None:
temp_path = orga.temp_file()
self._auth.download_file(file_id, temp_path)
orga.add_file(temp_path, to_path)
def _find_files(self, soup):
files = []
file_names = set()
found = soup.find_all("a", {"class": "il_ContainerItemTitle", "href": self.FILE_RE})
for element in found:
file_stem = element.string.strip().replace("/", ".")
file_type = element.parent.parent.parent.find("div", {"class": "il_ItemProperties"}).find("span").string.strip()
file_id = re.search(self.FILE_RE, element.get("href")).group(1)
file_name = f"{file_stem}.{file_type}"
if file_name in file_names:
counter = 1
while True:
file_name = f"{file_stem} (duplicate {counter}).{file_type}"
if file_name in file_names:
counter += 1
else:
break
files.append((file_name, file_id))
file_names.add(file_name)
return files
def _find_dirs(self, soup):
dirs = []
found = soup.find_all("div", {"class": "alert", "role": "alert"})
if found:
return []
found = soup.find_all("a", {"class": "il_ContainerItemTitle", "href": self.DIR_RE})
for element in found:
dir_name = element.string.strip().replace("/", ".")
ref_id = re.search(self.DIR_RE, element.get("href")).group(1)
dir_id = f"fold_{ref_id}"
dirs.append((dir_name, dir_id))
return dirs

10
PFERD/ilias/__init__.py Normal file
View File

@ -0,0 +1,10 @@
"""
Synchronizing files from ILIAS instances (https://www.ilias.de/).
"""
from .authenticators import IliasAuthenticator, KitShibbolethAuthenticator
from .crawler import (IliasCrawler, IliasCrawlerEntry, IliasDirectoryFilter,
IliasElementType)
from .downloader import (IliasDownloader, IliasDownloadInfo,
IliasDownloadStrategy, download_everything,
download_modified_or_new)

View File

@ -0,0 +1,131 @@
"""
Authenticators that can obtain proper ILIAS session cookies.
"""
import abc
import logging
from typing import Optional
import bs4
import requests
from ..authenticators import TfaAuthenticator, UserPassAuthenticator
from ..utils import soupify
LOGGER = logging.getLogger(__name__)
class IliasAuthenticator(abc.ABC):
# pylint: disable=too-few-public-methods
"""
An authenticator that logs an existing requests session into an ILIAS
account.
"""
@abc.abstractmethod
def authenticate(self, sess: requests.Session) -> None:
"""
Log a requests session into this authenticator's ILIAS account.
"""
class KitShibbolethAuthenticator(IliasAuthenticator):
# pylint: disable=too-few-public-methods
"""
Authenticate via KIT's shibboleth system.
"""
def __init__(self, username: Optional[str] = None, password: Optional[str] = None) -> None:
self._auth = UserPassAuthenticator("KIT ILIAS Shibboleth", username, password)
self._tfa_auth = TfaAuthenticator("KIT ILIAS Shibboleth")
def authenticate(self, sess: requests.Session) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
LOGGER.debug("Begin authentication process with ILIAS")
url = "https://ilias.studium.kit.edu/Shibboleth.sso/Login"
data = {
"sendLogin": "1",
"idp_selection": "https://idp.scc.kit.edu/idp/shibboleth",
"target": "/shib_login.php",
"home_organization_selection": "Mit KIT-Account anmelden",
}
soup = soupify(sess.post(url, data=data))
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"class": "form2", "method": "post"})
action = form["action"]
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
LOGGER.debug("Attempt to log in to Shibboleth using credentials")
url = "https://idp.scc.kit.edu" + action
data = {
"_eventId_proceed": "",
"j_username": self._auth.username,
"j_password": self._auth.password,
}
soup = soupify(sess.post(url, data=data))
if self._tfa_required(soup):
soup = self._authenticate_tfa(sess, soup)
if not self._login_successful(soup):
print("Incorrect credentials.")
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
LOGGER.debug("Redirect back to ILIAS with login information")
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = "https://ilias.studium.kit.edu/Shibboleth.sso/SAML2/POST"
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
sess.post(url, data=data)
def _authenticate_tfa(
self,
session: requests.Session,
soup: bs4.BeautifulSoup
) -> bs4.BeautifulSoup:
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
LOGGER.debug("Attempt to log in to Shibboleth with TFA token")
url = "https://idp.scc.kit.edu" + action
data = {
"_eventId_proceed": "",
"j_tokenNumber": self._tfa_auth.get_token()
}
return soupify(session.post(url, data=data))
@staticmethod
def _login_successful(soup: bs4.BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: bs4.BeautifulSoup) -> bool:
return soup.find(id="j_tokenNumber") is not None

591
PFERD/ilias/crawler.py Normal file
View File

@ -0,0 +1,591 @@
"""
Contains an ILIAS crawler alongside helper functions.
"""
import datetime
import json
import logging
import re
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import (parse_qs, urlencode, urljoin, urlparse, urlsplit,
urlunsplit)
import bs4
import requests
from ..errors import FatalException
from ..logging import PrettyLogger
from ..utils import soupify
from .authenticators import IliasAuthenticator
from .date_demangler import demangle_date
from .downloader import IliasDownloadInfo
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class IliasElementType(Enum):
"""
The type of an ilias element.
"""
REGULAR_FOLDER = "REGULAR_FOLDER"
VIDEO_FOLDER = "VIDEO_FOLDER"
EXERCISE_FOLDER = "EXERCISE_FOLDER"
REGULAR_FILE = "REGULAR_FILE"
VIDEO_FILE = "VIDEO_FILE"
FORUM = "FORUM"
EXTERNAL_LINK = "EXTERNAL_LINK"
IliasDirectoryFilter = Callable[[Path, IliasElementType], bool]
class IliasCrawlerEntry:
# pylint: disable=too-few-public-methods
"""
An ILIAS crawler entry used internally to find, catalogue and recursively crawl elements.
"""
def __init__(
self,
path: Path,
url: Union[str, Callable[[], Optional[str]]],
entry_type: IliasElementType,
modification_date: Optional[datetime.datetime]
):
self.path = path
if isinstance(url, str):
str_url = url
self.url: Callable[[], Optional[str]] = lambda: str_url
else:
self.url = url
self.entry_type = entry_type
self.modification_date = modification_date
def to_download_info(self) -> Optional[IliasDownloadInfo]:
"""
Converts this crawler entry to an IliasDownloadInfo, if possible.
This method will only succeed for *File* types.
"""
if self.entry_type in [IliasElementType.REGULAR_FILE, IliasElementType.VIDEO_FILE]:
return IliasDownloadInfo(self.path, self.url, self.modification_date)
return None
class IliasCrawler:
# pylint: disable=too-few-public-methods
"""
A crawler for ILIAS.
"""
# pylint: disable=too-many-arguments
def __init__(
self,
base_url: str,
session: requests.Session,
authenticator: IliasAuthenticator,
dir_filter: IliasDirectoryFilter
):
"""
Create a new ILIAS crawler.
"""
self._base_url = base_url
self._session = session
self._authenticator = authenticator
self.dir_filter = dir_filter
@staticmethod
def _url_set_query_param(url: str, param: str, value: str) -> str:
"""
Set a query parameter in an url, overwriting existing ones with the same name.
"""
scheme, netloc, path, query, fragment = urlsplit(url)
query_parameters = parse_qs(query)
query_parameters[param] = [value]
new_query_string = urlencode(query_parameters, doseq=True)
return urlunsplit((scheme, netloc, path, new_query_string, fragment))
def crawl_course(self, course_id: str) -> List[IliasDownloadInfo]:
"""
Starts the crawl process for a course, yielding a list of elements to (potentially)
download.
Arguments:
course_id {str} -- the course id
Raises:
FatalException: if an unrecoverable error occurs or the course id is not valid
"""
# Start crawling at the given course
root_url = self._url_set_query_param(
self._base_url + "/goto.php", "target", f"crs_{course_id}"
)
if not self._is_course_id_valid(root_url, course_id):
raise FatalException(
"Invalid course id? The URL the server returned did not contain my id."
)
# And treat it as a folder
entries: List[IliasCrawlerEntry] = self._crawl_folder(Path(""), root_url)
return self._iterate_entries_to_download_infos(entries)
def _is_course_id_valid(self, root_url: str, course_id: str) -> bool:
response: requests.Response = self._session.get(root_url)
return course_id in response.url
def crawl_personal_desktop(self) -> List[IliasDownloadInfo]:
"""
Crawls the ILIAS personal desktop (and every subelements that can be reached from there).
Raises:
FatalException: if an unrecoverable error occurs
"""
entries: List[IliasCrawlerEntry] = self._crawl_folder(
Path(""), self._base_url + "?baseClass=ilPersonalDesktopGUI"
)
return self._iterate_entries_to_download_infos(entries)
def _iterate_entries_to_download_infos(
self,
entries: List[IliasCrawlerEntry]
) -> List[IliasDownloadInfo]:
result: List[IliasDownloadInfo] = []
entries_to_process: List[IliasCrawlerEntry] = entries.copy()
while len(entries_to_process) > 0:
entry = entries_to_process.pop()
if entry.entry_type == IliasElementType.EXTERNAL_LINK:
PRETTY.not_searching(entry.path, "external link")
continue
if entry.entry_type == IliasElementType.FORUM:
PRETTY.not_searching(entry.path, "forum")
continue
if not self.dir_filter(entry.path, entry.entry_type):
PRETTY.not_searching(entry.path, "user filter")
continue
download_info = entry.to_download_info()
if download_info is not None:
result.append(download_info)
continue
url = entry.url()
if url is None:
PRETTY.warning(f"Could not find url for {str(entry.path)!r}, skipping it")
continue
PRETTY.searching(entry.path)
if entry.entry_type == IliasElementType.EXERCISE_FOLDER:
entries_to_process += self._crawl_exercises(entry.path, url)
continue
if entry.entry_type == IliasElementType.REGULAR_FOLDER:
entries_to_process += self._crawl_folder(entry.path, url)
continue
if entry.entry_type == IliasElementType.VIDEO_FOLDER:
entries_to_process += self._crawl_video_directory(entry.path, url)
continue
return result
def _crawl_folder(self, folder_path: Path, url: str) -> List[IliasCrawlerEntry]:
"""
Crawl all files in a folder-like element.
"""
soup = self._get_page(url, {})
result: List[IliasCrawlerEntry] = []
# Fetch all links and throw them to the general interpreter
links: List[bs4.Tag] = soup.select("a.il_ContainerItemTitle")
for link in links:
abs_url = self._abs_url_from_link(link)
element_path = Path(folder_path, link.getText().strip())
element_type = self._find_type_from_link(element_path, link, abs_url)
if element_type == IliasElementType.REGULAR_FILE:
result += self._crawl_file(folder_path, link, abs_url)
elif element_type is not None:
result += [IliasCrawlerEntry(element_path, abs_url, element_type, None)]
else:
PRETTY.warning(f"Found element without a type at {str(element_path)!r}")
return result
def _abs_url_from_link(self, link_tag: bs4.Tag) -> str:
"""
Create an absolute url from an <a> tag.
"""
return urljoin(self._base_url, link_tag.get("href"))
@staticmethod
def _find_type_from_link(
path: Path,
link_element: bs4.Tag,
url: str
) -> Optional[IliasElementType]:
"""
Decides which sub crawler to use for a given top level element.
"""
parsed_url = urlparse(url)
LOGGER.debug("Parsed url: %r", parsed_url)
# file URLs contain "target=file"
if "target=file_" in parsed_url.query:
return IliasElementType.REGULAR_FILE
# Skip forums
if "cmd=showThreads" in parsed_url.query:
return IliasElementType.FORUM
# Everything with a ref_id can *probably* be opened to reveal nested things
# video groups, directories, exercises, etc
if "ref_id=" in parsed_url.query:
return IliasCrawler._find_type_from_folder_like(link_element, url)
PRETTY.warning(
"Got unknown element type in switch. I am not sure what horror I found on the"
f" ILIAS page. The element was at {str(path)!r} and it is {link_element!r})"
)
return None
@staticmethod
def _find_type_from_folder_like(link_element: bs4.Tag, url: str) -> Optional[IliasElementType]:
"""
Try crawling something that looks like a folder.
"""
# pylint: disable=too-many-return-statements
# We look for the outer div of our inner link, to find information around it
# (mostly the icon)
for parent in link_element.parents:
if "ilContainerListItemOuter" in parent["class"]:
found_parent = parent
break
if found_parent is None:
PRETTY.warning(f"Could not find element icon for {url!r}")
return None
# Find the small descriptive icon to figure out the type
img_tag: Optional[bs4.Tag] = found_parent.select_one("img.ilListItemIcon")
if img_tag is None:
PRETTY.warning(f"Could not find image tag for {url!r}")
return None
if "opencast" in str(img_tag["alt"]).lower():
return IliasElementType.VIDEO_FOLDER
if str(img_tag["src"]).endswith("icon_exc.svg"):
return IliasElementType.EXERCISE_FOLDER
if str(img_tag["src"]).endswith("icon_webr.svg"):
return IliasElementType.EXTERNAL_LINK
if str(img_tag["src"]).endswith("frm.svg"):
return IliasElementType.FORUM
return IliasElementType.REGULAR_FOLDER
@staticmethod
def _crawl_file(path: Path, link_element: bs4.Tag, url: str) -> List[IliasCrawlerEntry]:
"""
Crawls a file.
"""
# Files have a list of properties (type, modification date, size, etc.)
# In a series of divs.
# Find the parent containing all those divs, so we can filter our what we need
properties_parent: bs4.Tag = link_element.findParent(
"div", {"class": lambda x: "il_ContainerListItem" in x}
).select_one(".il_ItemProperties")
# The first one is always the filetype
file_type = properties_parent.select_one("span.il_ItemProperty").getText().strip()
# The rest does not have a stable order. Grab the whole text and reg-ex the date
# out of it
all_properties_text = properties_parent.getText().strip()
modification_date_match = re.search(
r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)",
all_properties_text
)
if modification_date_match is None:
modification_date = None
PRETTY.warning(f"Could not extract start date from {all_properties_text!r}")
else:
modification_date_str = modification_date_match.group(1)
modification_date = demangle_date(modification_date_str)
# Grab the name from the link text
name = link_element.getText()
full_path = Path(path, name + "." + file_type)
return [
IliasCrawlerEntry(full_path, url, IliasElementType.REGULAR_FILE, modification_date)
]
def _crawl_video_directory(self, video_dir_path: Path, url: str) -> List[IliasCrawlerEntry]:
"""
Crawl the video overview site.
"""
initial_soup = self._get_page(url, {})
# The page is actually emtpy but contains a much needed token in the link below.
# That token can be used to fetch the *actual* video listing
content_link: bs4.Tag = initial_soup.select_one("#tab_series a")
# Fetch the actual video listing. The given parameters return all videos (max 800)
# in a standalone html page
video_list_soup = self._get_page(
self._abs_url_from_link(content_link),
{"limit": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
)
# If we find a page selected, we probably need to respect pagination
if self._is_paginated_video_page(video_list_soup):
second_stage_url = self._abs_url_from_link(content_link)
return self._crawl_paginated_video_directory(
video_dir_path, video_list_soup, second_stage_url
)
return self._crawl_video_directory_second_stage(video_dir_path, video_list_soup)
@staticmethod
def _is_paginated_video_page(soup: bs4.BeautifulSoup) -> bool:
return soup.find(id=re.compile(r"tab_page_sel.+")) is not None
def _crawl_paginated_video_directory(
self,
video_dir_path: Path,
paged_video_list_soup: bs4.BeautifulSoup,
second_stage_url: str
) -> List[IliasCrawlerEntry]:
LOGGER.info("Found paginated video page, trying 800 elements")
# Try to find the table id. This can be used to build the query parameter indicating
# you want 800 elements
table_element: bs4.Tag = paged_video_list_soup.find(
name="table", id=re.compile(r"tbl_xoct_.+")
)
if table_element is None:
PRETTY.warning(
"Could not increase elements per page (table not found)."
" Some might not be crawled!"
)
return self._crawl_video_directory_second_stage(video_dir_path, paged_video_list_soup)
match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"])
if match is None:
PRETTY.warning(
"Could not increase elements per page (table id not found)."
" Some might not be crawled!"
)
return self._crawl_video_directory_second_stage(video_dir_path, paged_video_list_soup)
table_id = match.group(1)
extended_video_page = self._get_page(
second_stage_url,
{f"tbl_xoct_{table_id}_trows": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
)
if self._is_paginated_video_page(extended_video_page):
PRETTY.warning(
"800 elements do not seem to be enough (or I failed to fetch that many)."
" I will miss elements."
)
return self._crawl_video_directory_second_stage(video_dir_path, extended_video_page)
def _crawl_video_directory_second_stage(
self,
video_dir_path: Path,
video_list_soup: bs4.BeautifulSoup
) -> List[IliasCrawlerEntry]:
"""
Crawls the "second stage" video page. This page contains the actual video urls.
"""
direct_download_links: List[bs4.Tag] = video_list_soup.findAll(
name="a", text=re.compile(r"\s*Download\s*")
)
# Video start links are marked with an "Abspielen" link
video_links: List[bs4.Tag] = video_list_soup.findAll(
name="a", text=re.compile(r"\s*Abspielen\s*")
)
results: List[IliasCrawlerEntry] = []
# We can download everything directly!
if len(direct_download_links) == len(video_links):
for link in direct_download_links:
results += self._crawl_single_video(video_dir_path, link, True)
else:
for link in video_links:
results += self._crawl_single_video(video_dir_path, link, False)
return results
def _crawl_single_video(
self,
parent_path: Path,
link: bs4.Tag,
direct_download: bool
) -> List[IliasCrawlerEntry]:
"""
Crawl a single video based on its "Abspielen" link from the video listing.
"""
# The link is part of a table with multiple columns, describing metadata.
# 6th child (1 indexed) is the modification time string
modification_string = link.parent.parent.parent.select_one(
"td.std:nth-child(6)"
).getText().strip()
modification_time = datetime.datetime.strptime(modification_string, "%d.%m.%Y - %H:%M")
title = link.parent.parent.parent.select_one(
"td.std:nth-child(3)"
).getText().strip()
title += ".mp4"
video_path: Path = Path(parent_path, title)
video_url = self._abs_url_from_link(link)
# The video had a direct download button we can use instead
if direct_download:
LOGGER.debug("Using direct download for video %r", str(video_path))
return [IliasCrawlerEntry(
video_path, video_url, IliasElementType.VIDEO_FILE, modification_time
)]
return [IliasCrawlerEntry(
video_path,
self._crawl_video_url_from_play_link(video_url),
IliasElementType.VIDEO_FILE,
modification_time
)]
def _crawl_video_url_from_play_link(self, play_url: str) -> Callable[[], Optional[str]]:
def inner() -> Optional[str]:
# Fetch the actual video page. This is a small wrapper page initializing a javscript
# player. Sadly we can not execute that JS. The actual video stream url is nowhere
# on the page, but defined in a JS object inside a script tag, passed to the player
# library.
# We do the impossible and RegEx the stream JSON object out of the page's HTML source
video_page_soup = soupify(self._session.get(play_url))
regex: re.Pattern = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
)
json_match = regex.search(str(video_page_soup))
if json_match is None:
PRETTY.warning(f"Could not find json stream info for {play_url!r}")
return None
json_str = json_match.group(1)
# parse it
json_object = json.loads(json_str)
# and fetch the video url!
video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"]
return video_url
return inner
def _crawl_exercises(self, element_path: Path, url: str) -> List[IliasCrawlerEntry]:
"""
Crawl files offered for download in exercises.
"""
soup = self._get_page(url, {})
results: List[IliasCrawlerEntry] = []
# Each assignment is in an accordion container
assignment_containers: List[bs4.Tag] = soup.select(".il_VAccordionInnerContainer")
for container in assignment_containers:
# Fetch the container name out of the header to use it in the path
container_name = container.select_one(".ilAssignmentHeader").getText().strip()
# Find all download links in the container (this will contain all the files)
files: List[bs4.Tag] = container.findAll(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x},
text="Download"
)
LOGGER.debug("Found exercise container %r", container_name)
# Grab each file as you now have the link
for file_link in files:
# Two divs, side by side. Left is the name, right is the link ==> get left
# sibling
file_name = file_link.parent.findPrevious(name="div").getText().strip()
url = self._abs_url_from_link(file_link)
LOGGER.debug("Found file %r at %r", file_name, url)
results.append(IliasCrawlerEntry(
Path(element_path, container_name, file_name),
url,
IliasElementType.REGULAR_FILE,
None # We do not have any timestamp
))
return results
def _get_page(self, url: str, params: Dict[str, Any]) -> bs4.BeautifulSoup:
"""
Fetches a page from ILIAS, authenticating when needed.
"""
LOGGER.debug("Fetching %r", url)
response = self._session.get(url, params=params)
content_type = response.headers["content-type"]
if not content_type.startswith("text/html"):
raise FatalException(
f"Invalid content type {content_type} when crawling ilias page"
" {url!r} with {params!r}"
)
soup = soupify(response)
if self._is_logged_in(soup):
return soup
LOGGER.info("Not authenticated, changing that...")
self._authenticator.authenticate(self._session)
return self._get_page(url, params)
@staticmethod
def _is_logged_in(soup: bs4.BeautifulSoup) -> bool:
# Normal ILIAS pages
userlog = soup.find("li", {"id": "userlog"})
if userlog is not None:
LOGGER.debug("Auth: Found #userlog")
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
LOGGER.debug("Auth: Found #tbl_xoct.+")
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
LOGGER.debug("Auth: Found #playerContainer")
return True
return False

View File

@ -0,0 +1,51 @@
"""
Helper methods to demangle an ILIAS date.
"""
import datetime
import locale
import logging
import re
from typing import Optional
from ..logging import PrettyLogger
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
def demangle_date(date: str) -> Optional[datetime.datetime]:
"""
Demangle a given date in one of the following formats:
"Gestern, HH:MM"
"Heute, HH:MM"
"Morgen, HH:MM"
"dd. mon.yyyy, HH:MM
"""
saved = locale.setlocale(locale.LC_ALL)
try:
try:
locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8')
except locale.Error:
PRETTY.warning(
"Could not set language to german. Assuming you use english everywhere."
)
date = re.sub(r"\s+", " ", date)
date = re.sub("Gestern|Yesterday", _yesterday().strftime("%d. %b %Y"), date, re.I)
date = re.sub("Heute|Today", datetime.date.today().strftime("%d. %b %Y"), date, re.I)
date = re.sub("Morgen|Tomorrow", _tomorrow().strftime("%d. %b %Y"), date, re.I)
return datetime.datetime.strptime(date, "%d. %b %Y, %H:%M")
except ValueError:
PRETTY.warning(f"Could not parse date {date!r}")
return None
finally:
locale.setlocale(locale.LC_ALL, saved)
def _yesterday() -> datetime.date:
return datetime.date.today() - datetime.timedelta(days=1)
def _tomorrow() -> datetime.date:
return datetime.date.today() + datetime.timedelta(days=1)

146
PFERD/ilias/downloader.py Normal file
View File

@ -0,0 +1,146 @@
"""Contains a downloader for ILIAS."""
import datetime
import logging
from pathlib import Path, PurePath
from typing import Callable, List, Optional, Union
import bs4
import requests
from ..logging import PrettyLogger
from ..organizer import Organizer
from ..tmp_dir import TmpDir
from ..transform import Transformable
from ..utils import soupify, stream_to_path
from .authenticators import IliasAuthenticator
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class ContentTypeException(Exception):
"""Thrown when the content type of the ilias element can not be handled."""
class IliasDownloadInfo(Transformable):
"""
This class describes a single file to be downloaded.
"""
def __init__(
self,
path: PurePath,
url: Union[str, Callable[[], Optional[str]]],
modifcation_date: Optional[datetime.datetime]
):
super().__init__(path)
if isinstance(url, str):
string_url = url
self.url: Callable[[], Optional[str]] = lambda: string_url
else:
self.url = url
self.modification_date = modifcation_date
IliasDownloadStrategy = Callable[[Organizer, IliasDownloadInfo], bool]
def download_everything(organizer: Organizer, info: IliasDownloadInfo) -> bool:
# pylint: disable=unused-argument
"""
Accepts everything.
"""
return True
def download_modified_or_new(organizer: Organizer, info: IliasDownloadInfo) -> bool:
"""
Accepts new files or files with a more recent modification date.
"""
resolved_file = organizer.resolve(info.path)
if not resolved_file.exists() or info.modification_date is None:
return True
resolved_mod_time_seconds = resolved_file.stat().st_mtime
# Download if the info is newer
if info.modification_date.timestamp() > resolved_mod_time_seconds:
return True
PRETTY.ignored_file(info.path, "local file has newer or equal modification time")
return False
class IliasDownloader:
# pylint: disable=too-many-arguments
"""A downloader for ILIAS."""
def __init__(
self,
tmp_dir: TmpDir,
organizer: Organizer,
session: requests.Session,
authenticator: IliasAuthenticator,
strategy: IliasDownloadStrategy,
):
"""
Create a new IliasDownloader.
"""
self._tmp_dir = tmp_dir
self._organizer = organizer
self._session = session
self._authenticator = authenticator
self._strategy = strategy
def download_all(self, infos: List[IliasDownloadInfo]) -> None:
"""
Download multiple files one after the other.
"""
for info in infos:
self.download(info)
def download(self, info: IliasDownloadInfo) -> None:
"""
Download a file from ILIAS.
Retries authentication until eternity if it could not fetch the file.
"""
LOGGER.debug("Downloading %r", info)
if not self._strategy(self._organizer, info):
self._organizer.mark(info.path)
return
tmp_file = self._tmp_dir.new_path()
while not self._try_download(info, tmp_file):
LOGGER.info("Retrying download: %r", info)
self._authenticator.authenticate(self._session)
self._organizer.accept_file(tmp_file, info.path)
def _try_download(self, info: IliasDownloadInfo, target: Path) -> bool:
url = info.url()
if url is None:
PRETTY.warning(f"Could not download {str(info.path)!r} as I got no URL :/")
return True
with self._session.get(url, stream=True) as response:
content_type = response.headers["content-type"]
if content_type.startswith("text/html"):
if self._is_logged_in(soupify(response)):
raise ContentTypeException("Attempting to download a web page, not a file")
return False
# Yay, we got the file :)
stream_to_path(response, target, info.path.name)
return True
@staticmethod
def _is_logged_in(soup: bs4.BeautifulSoup) -> bool:
userlog = soup.find("li", {"id": "userlog"})
return userlog is not None

View File

@ -1,177 +0,0 @@
# This file is called IliasAuthenticators because there are multiple mechanisms
# for authenticating with Ilias (even though only the Shibboleth is currently
# implemented). Most of what the ShibbolethAuthenticator currently does is
# not Shibboleth specific; this mess would have to be cleaned up before
# actually implementing any other authentication method.
#
# I think the only other method is the password prompt when clicking the log in
# button.
import getpass
import http.cookiejar
import logging
import time
import bs4
import requests
from .utils import ContentTypeException, stream_to_path
__all__ = ["ShibbolethAuthenticator"]
logger = logging.getLogger(__name__)
class ShibbolethAuthenticator:
ILIAS_GOTO = "https://ilias.studium.kit.edu/goto.php"
ALLOWED_CONTENT_TYPES = [
"application/pdf",
"application/zip",
"application/msword",
"application/vnd.wolfram.nb",
"text/xml",
"text/plain",
"image/jpeg",
"image/png",
]
def __init__(self, cookie_file) -> None:
# Because LWPCookieJar insists on the path being str-like instead of
# Path-like.
cookie_file = str(cookie_file)
cookies = http.cookiejar.LWPCookieJar(cookie_file)
try:
logger.info(f"Loading old cookies from {cookie_file!r}")
cookies.load(ignore_discard=True)
except (FileNotFoundError, http.cookiejar.LoadError):
logger.warn(f"No (valid) cookie file found at {cookie_file!r}, ignoring...")
self._session = requests.Session()
self._session.cookies = cookies
def _authenticate(self):
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
logger.debug("Begin authentication process with ILIAS")
url = "https://ilias.studium.kit.edu/Shibboleth.sso/Login"
data = {
"sendLogin": "1",
"idp_selection": "https://idp.scc.kit.edu/idp/shibboleth",
"target": "/shib_login.php",
"home_organization_selection": "Mit KIT-Account anmelden",
}
r = self._session.post(url, data=data)
soup = bs4.BeautifulSoup(r.text, "html.parser")
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"class": "form2", "method": "post"})
action = form["action"]
print("Please enter Shibboleth credentials.")
username = getpass.getpass(prompt="Username: ")
password = getpass.getpass(prompt="Password: ")
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
logger.debug("Attempt to log in to Shibboleth using credentials")
url = "https://idp.scc.kit.edu" + action
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
}
r = self._session.post(url, data=data)
soup = bs4.BeautifulSoup(r.text, "html.parser")
if not self._login_successful(soup):
print("Incorrect credentials.")
# Saving progress
logger.info("Saving cookies (successfully authenticated with Shibboleth)")
self._session.cookies.save(ignore_discard=True)
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
logger.debug("Redirect back to ILIAS with login information")
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = "https://ilias.studium.kit.edu/Shibboleth.sso/SAML2/POST"
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
self._session.post(url, data=data)
# Saving progress
logger.info("Saving cookies (successfully authenticated with ILIAS)")
self._session.cookies.save(ignore_discard=True)
def _login_successful(self, soup):
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
def _is_logged_in(self, soup):
userlog = soup.find("li", {"id": "userlog"})
return userlog is not None
def get_webpage(self, object_id):
params = {"target": object_id}
while True:
logger.debug(f"Getting {self.ILIAS_GOTO} {params}")
r = self._session.get(self.ILIAS_GOTO, params=params)
soup = bs4.BeautifulSoup(r.text, "html.parser")
if self._is_logged_in(soup):
return soup
else:
logger.info("Not logged in, authenticating...")
self._authenticate()
def get_webpage_by_refid(self, ref_id):
return self.get_webpage(f"fold_{ref_id}")
def _download(self, url, params, to_path):
with self._session.get(url, params=params, stream=True) as r:
content_type = r.headers["content-type"]
if content_type in self.ALLOWED_CONTENT_TYPES:
# Yay, we got the file :)
stream_to_path(r, to_path)
return True
elif content_type == "text/html":
# Dangit, we're probably not logged in.
soup = bs4.BeautifulSoup(r.text, "html.parser")
if self._is_logged_in(soup):
raise ContentTypeException(
"Attempting to download a web page, not a file")
return False
else:
# What *did* we get?
raise ContentTypeException(
f"Unknown file of type {content_type}")
def download_file(self, file_id, to_path):
params = {"target": file_id}
while True:
success = self._download(self.ILIAS_GOTO, params, to_path)
if success:
return
else:
logger.info("Not logged in, authenticating...")
self._authenticate()

41
PFERD/location.py Normal file
View File

@ -0,0 +1,41 @@
"""
Contains a Location class for objects with an inherent path.
"""
from pathlib import Path, PurePath
class ResolveException(Exception):
"""An exception while resolving a file."""
# TODO take care of this when doing exception handling
class Location:
"""
An object that has an inherent path.
"""
def __init__(self, path: Path):
self._path = path.resolve()
@property
def path(self) -> Path:
"""
This object's location.
"""
return self._path
def resolve(self, target: PurePath) -> Path:
"""
Resolve a file relative to the path of this location.
Raises a [ResolveException] if the file is outside the given directory.
"""
absolute_path = self.path.joinpath(target).resolve()
# TODO Make this less inefficient
if self.path not in absolute_path.parents:
raise ResolveException(f"Path {target} is not inside directory {self.path}")
return absolute_path

157
PFERD/logging.py Normal file
View File

@ -0,0 +1,157 @@
"""
Contains a few logger utility functions and implementations.
"""
import logging
from typing import Optional
from rich._log_render import LogRender
from rich.console import Console
from rich.style import Style
from rich.text import Text
from rich.theme import Theme
from .utils import PathLike, to_path
STYLE = "{"
FORMAT = "[{levelname:<7}] {message}"
DATE_FORMAT = "%F %T"
def enable_logging(name: str = "PFERD", level: int = logging.INFO) -> None:
"""
Enable and configure logging via the logging module.
"""
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(RichLoggingHandler(level=level))
# This should be logged by our own handler, and not the root logger's
# default handler, so we don't pass it on to the root logger.
logger.propagate = False
class RichLoggingHandler(logging.Handler):
"""
A logging handler that uses rich for highlighting
"""
def __init__(self, level: int) -> None:
super().__init__(level=level)
self.console = Console(theme=Theme({
"logging.level.warning": Style(color="yellow")
}))
self._log_render = LogRender(show_level=True, show_time=False, show_path=False)
def emit(self, record: logging.LogRecord) -> None:
"""
Invoked by logging.
"""
log_style = f"logging.level.{record.levelname.lower()}"
message = self.format(record)
level = Text()
level.append(record.levelname, log_style)
message_text = Text.from_markup(message)
self.console.print(
self._log_render(
self.console,
[message_text],
level=level,
)
)
class PrettyLogger:
"""
A logger that prints some specially formatted log messages in color.
"""
def __init__(self, logger: logging.Logger) -> None:
self.logger = logger
@staticmethod
def _format_path(path: PathLike) -> str:
return repr(str(to_path(path)))
def error(self, message: str) -> None:
"""
Print an error message indicating some operation fatally failed.
"""
self.logger.error(
f"[bold red]{message}[/bold red]"
)
def warning(self, message: str) -> None:
"""
Print a warning message indicating some operation failed, but the error can be recovered
or ignored.
"""
self.logger.warning(
f"[bold yellow]{message}[/bold yellow]"
)
def modified_file(self, path: PathLike) -> None:
"""
An existing file has changed.
"""
self.logger.info(
f"[bold magenta]Modified {self._format_path(path)}.[/bold magenta]"
)
def new_file(self, path: PathLike) -> None:
"""
A new file has been downloaded.
"""
self.logger.info(
f"[bold green]Created {self._format_path(path)}.[/bold green]"
)
def ignored_file(self, path: PathLike, reason: str) -> None:
"""
File was not downloaded or modified.
"""
self.logger.info(
f"[dim]Ignored {self._format_path(path)} "
f"([/dim]{reason}[dim]).[/dim]"
)
def searching(self, path: PathLike) -> None:
"""
A crawler searches a particular object.
"""
self.logger.info(f"Searching {self._format_path(path)}")
def not_searching(self, path: PathLike, reason: str) -> None:
"""
A crawler does not search a particular object.
"""
self.logger.info(
f"[dim]Not searching {self._format_path(path)} "
f"([/dim]{reason}[dim]).[/dim]"
)
def starting_synchronizer(
self,
target_directory: PathLike,
synchronizer_name: str,
subject: Optional[str] = None,
) -> None:
"""
A special message marking that a synchronizer has been started.
"""
subject_str = f"{subject} " if subject else ""
self.logger.info("")
self.logger.info((
f"[bold cyan]Synchronizing "
f"{subject_str}to {self._format_path(target_directory)} "
f"using the {synchronizer_name} synchronizer.[/bold cyan]"
))

View File

@ -1,108 +0,0 @@
# Norberts Prog-Tuts
import logging
import pathlib
import re
import zipfile
import bs4
import requests
from .organizer import Organizer
from .utils import rename, stream_to_path, PrettyLogger
__all__ = ["Norbert"]
logger = logging.getLogger(__name__)
pretty = PrettyLogger(logger)
class Norbert:
BASE_URL = "https://studwww.informatik.kit.edu/~s_blueml/"
LINK_RE = re.compile(r"^progtut/.*/(.*\.zip)$")
def __init__(self, base_path):
self.base_path = base_path
self._session = requests.Session()
def synchronize(self, to_dir, transform=lambda x: x, unzip=lambda _: True):
pretty.starting_synchronizer(to_dir, "Norbert")
sync_path = pathlib.Path(self.base_path, to_dir)
orga = Organizer(self.base_path, sync_path)
orga.clean_temp_dir()
files = self._crawl()
self._download(orga, files, transform, unzip)
orga.clean_sync_dir()
orga.clean_temp_dir()
def _crawl(self):
url = self.BASE_URL
r = self._session.get(url)
# replace undecodeable characters with a placeholder
#text = r.raw.decode("utf-8", "replace")
text = r.text
soup = bs4.BeautifulSoup(text, "html.parser")
files = []
for found in soup.find_all("a", href=self.LINK_RE):
url = found["href"]
full_url = self.BASE_URL + url
filename = re.search(self.LINK_RE, url).group(1)
path = pathlib.PurePath(filename)
logger.debug(f"Found zip file {filename} at {full_url}")
files.append((path, full_url))
return files
def _download(self, orga, files, transform, unzip):
for path, url in sorted(files):
# Yes, we want the zip file contents
if unzip(path):
logger.debug(f"Downloading and unzipping {path}")
zip_path = rename(path, path.stem)
# Download zip file
temp_file = orga.temp_file()
self._download_zip(url, temp_file)
# Search the zip file for files to extract
temp_dir = orga.temp_dir()
with zipfile.ZipFile(temp_file, "r") as zf:
for info in zf.infolist():
# Only interested in the files themselves, the directory
# structure is created automatically by orga.add_file()
if info.is_dir():
continue
file_path = zip_path / pathlib.PurePath(info.filename)
logger.debug(f"Found {info.filename} at path {file_path}")
new_path = transform(file_path)
if new_path is not None:
# Extract to temp file and add, the usual deal
temp_file = orga.temp_file()
extracted_path = zf.extract(info, temp_dir)
extracted_path = pathlib.Path(extracted_path)
orga.add_file(extracted_path, new_path)
# No, we only want the zip file itself
else:
logger.debug(f"Only downloading {path}")
new_path = transform(path)
if new_path is not None:
temp_file = orga.temp_file()
self._download_zip(url, temp_file)
orga.add_file(temp_file, new_path)
def _download_zip(self, url, to_path):
with self._session.get(url, stream=True) as r:
stream_to_path(r, to_path)

View File

@ -1,151 +1,123 @@
"""A simple helper for managing downloaded files.
A organizer is bound to a single directory.
"""
import filecmp
import logging
import pathlib
import shutil
from pathlib import Path, PurePath
from typing import List, Set
from . import utils
from .location import Location
from .logging import PrettyLogger
from .utils import prompt_yes_no
__all__ = ["Organizer"]
logger = logging.getLogger(__name__)
pretty = utils.PrettyLogger(logger)
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class Organizer:
def __init__(self, base_dir, sync_dir):
"""
base_dir - the .tmp directory will be created here
sync_dir - synced files will be moved here
Both are expected to be concrete pathlib paths.
"""
self._base_dir = base_dir
self._sync_dir = sync_dir
class FileAcceptException(Exception):
"""An exception while accepting a file."""
self._temp_dir = pathlib.Path(self._base_dir, ".tmp")
self._temp_nr = 0
# check if base/sync dir exist?
class Organizer(Location):
"""A helper for managing downloaded files."""
self._added_files = set()
def __init__(self, path: Path):
"""Create a new organizer for a given path."""
super().__init__(path)
self._known_files: Set[Path] = set()
def clean_temp_dir(self):
if self._temp_dir.exists():
shutil.rmtree(self._temp_dir)
self._temp_dir.mkdir(exist_ok=True)
logger.debug(f"Cleaned temp dir: {self._temp_dir}")
# Keep the root dir
self._known_files.add(path.resolve())
def temp_dir(self):
nr = self._temp_nr
self._temp_nr += 1
temp_dir = pathlib.Path(self._temp_dir, f"{nr:08}").resolve()
logger.debug(f"Produced new temp dir: {temp_dir}")
return temp_dir
def accept_file(self, src: Path, dst: PurePath) -> None:
"""Move a file to this organizer and mark it."""
src_absolute = src.resolve()
dst_absolute = self.resolve(dst)
def temp_file(self):
# generate the path to a new temp file in base_path/.tmp/
# make sure no two paths are the same
nr = self._temp_nr
self._temp_nr += 1
temp_file = pathlib.Path(self._temp_dir, f"{nr:08}.tmp").resolve()
logger.debug(f"Produced new temp file: {temp_file}")
return temp_file
if not src_absolute.exists():
raise FileAcceptException("Source file does not exist")
def add_file(self, from_path, to_path):
if not from_path.exists():
raise utils.FileNotFoundException(f"Could not add file at {from_path}")
if not src_absolute.is_file():
raise FileAcceptException("Source is a directory")
# check if sync_dir/to_path is inside sync_dir?
to_path = pathlib.Path(self._sync_dir, to_path)
LOGGER.debug("Copying %s to %s", src_absolute, dst_absolute)
if to_path.exists() and to_path.is_dir():
if self._prompt_yes_no(f"Overwrite folder {to_path} with file?", default=False):
shutil.rmtree(to_path)
else:
logger.warn(f"Could not add file {to_path}")
if self._is_marked(dst):
PRETTY.warning(f"File {str(dst_absolute)!r} was already written!")
if not prompt_yes_no(f"Overwrite file?", default=False):
PRETTY.ignored_file(dst_absolute, "file was written previously")
return
if to_path.exists():
if filecmp.cmp(from_path, to_path, shallow=False):
pretty.ignored_file(to_path)
# remember path for later reference
self._added_files.add(to_path.resolve())
logger.debug(f"Added file {to_path.resolve()}")
# No further action needed, especially not overwriting symlinks...
# Destination file is directory
if dst_absolute.exists() and dst_absolute.is_dir():
if prompt_yes_no(f"Overwrite folder {dst_absolute} with file?", default=False):
shutil.rmtree(dst_absolute)
else:
PRETTY.warning(f"Could not add file {str(dst_absolute)!r}")
return
else:
pretty.modified_file(to_path)
# Destination file exists
if dst_absolute.exists() and dst_absolute.is_file():
if filecmp.cmp(str(src_absolute), str(dst_absolute), shallow=False):
# Bail out, nothing more to do
PRETTY.ignored_file(dst_absolute, "same file contents")
self.mark(dst)
return
PRETTY.modified_file(dst_absolute)
else:
pretty.new_file(to_path)
PRETTY.new_file(dst_absolute)
# copy the file from from_path to sync_dir/to_path
# If the file being replaced was a symlink, the link itself is overwritten,
# not the file the link points to.
to_path.parent.mkdir(parents=True, exist_ok=True)
from_path.replace(to_path)
logger.debug(f"Moved {from_path} to {to_path}")
# Create parent dir if needed
dst_parent_dir: Path = dst_absolute.parent
dst_parent_dir.mkdir(exist_ok=True, parents=True)
# remember path for later reference, after the new file was written
# This is necessary here because otherwise, resolve() would resolve the symlink too.
self._added_files.add(to_path.resolve())
logger.debug(f"Added file {to_path.resolve()}")
# Move file
shutil.move(str(src_absolute), str(dst_absolute))
def clean_sync_dir(self):
self._clean_dir(self._sync_dir, remove_parent=False)
logger.debug(f"Cleaned sync dir: {self._sync_dir}")
self.mark(dst)
def _clean_dir(self, path, remove_parent=True):
for child in sorted(path.iterdir()):
logger.debug(f"Looking at {child.resolve()}")
if child.is_dir():
self._clean_dir(child, remove_parent=True)
elif child.resolve() not in self._added_files:
if self._prompt_yes_no(f"Delete {child}?", default=False):
child.unlink()
logger.debug(f"Deleted {child}")
def mark(self, path: PurePath) -> None:
"""Mark a file as used so it will not get cleaned up."""
absolute_path = self.resolve(path)
self._known_files.add(absolute_path)
LOGGER.debug("Tracked %s", absolute_path)
if remove_parent:
try:
path.rmdir()
except OSError: # directory not empty
pass
def _is_marked(self, path: PurePath) -> bool:
"""
Checks whether a file is marked.
"""
absolute_path = self.resolve(path)
return absolute_path in self._known_files
def _prompt_yes_no(self, question, default=None):
if default is True:
prompt = "[Y/n]"
elif default is False:
prompt = "[y/N]"
else:
prompt = "[y/n]"
def cleanup(self) -> None:
"""Remove all untracked files in the organizer's dir."""
LOGGER.debug("Deleting all untracked files...")
text = f"{question} {prompt} "
WRONG_REPLY = "Please reply with 'yes'/'y' or 'no'/'n'."
self._cleanup(self.path)
while True:
response = input(text).strip().lower()
if response in {"yes", "ye", "y"}:
return True
elif response in {"no", "n"}:
return False
elif response == "":
if default is None:
print(WRONG_REPLY)
else:
return default
def _cleanup(self, start_dir: Path) -> None:
paths: List[Path] = list(start_dir.iterdir())
# Recursively clean paths
for path in paths:
if path.is_dir():
self._cleanup(path)
else:
print(WRONG_REPLY)
if path.resolve() not in self._known_files:
self._delete_file_if_confirmed(path)
# How to use:
#
# 1. Before downloading any files
# orga = Organizer("/home/user/sync/", "/home/user/sync/bookstore/")
# orga.clean_temp_dir()
#
# 2. Downloading a file
# tempfile = orga.temp_file()
# download_something_to(tempfile)
# orga.add_file(tempfile, "books/douglas_adams/hhgttg"
#
# 3. After downloading all files
# orga.clean_sync_dir()
# orga.clean_temp_dir()
# Delete dir if it was empty and untracked
dir_empty = len(list(start_dir.iterdir())) == 0
if start_dir.resolve() not in self._known_files and dir_empty:
start_dir.rmdir()
@staticmethod
def _delete_file_if_confirmed(path: Path) -> None:
prompt = f"Do you want to delete {path}"
if prompt_yes_no(prompt, False):
path.unlink()

263
PFERD/pferd.py Normal file
View File

@ -0,0 +1,263 @@
"""
Convenience functions for using PFERD.
"""
import logging
from pathlib import Path
from typing import Callable, List, Optional, Union
from .cookie_jar import CookieJar
from .diva import (DivaDownloader, DivaDownloadStrategy, DivaPlaylistCrawler,
diva_download_new)
from .errors import FatalException, swallow_and_print_errors
from .ilias import (IliasAuthenticator, IliasCrawler, IliasDirectoryFilter,
IliasDownloader, IliasDownloadInfo, IliasDownloadStrategy,
KitShibbolethAuthenticator, download_modified_or_new)
from .location import Location
from .logging import PrettyLogger, enable_logging
from .organizer import Organizer
from .tmp_dir import TmpDir
from .transform import TF, Transform, apply_transform
from .utils import PathLike, to_path
# TODO save known-good cookies as soon as possible
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class Pferd(Location):
# pylint: disable=too-many-arguments
"""
The main entrypoint in your Pferd usage: This class combines a number of
useful shortcuts for running synchronizers in a single interface.
"""
def __init__(
self,
base_dir: Path,
tmp_dir: Path = Path(".tmp"),
test_run: bool = False
):
super().__init__(Path(base_dir))
self._tmp_dir = TmpDir(self.resolve(tmp_dir))
self._test_run = test_run
@staticmethod
def enable_logging() -> None:
"""
Enable and configure logging via the logging module.
"""
enable_logging()
@staticmethod
def _print_transformables(transformables: List[TF]) -> None:
LOGGER.info("")
LOGGER.info("Results of the test run:")
for transformable in transformables:
LOGGER.info(transformable.path)
def _ilias(
self,
target: PathLike,
base_url: str,
crawl_function: Callable[[IliasCrawler], List[IliasDownloadInfo]],
authenticator: IliasAuthenticator,
cookies: Optional[PathLike],
dir_filter: IliasDirectoryFilter,
transform: Transform,
download_strategy: IliasDownloadStrategy,
clean: bool = True
) -> Organizer:
# pylint: disable=too-many-locals
cookie_jar = CookieJar(to_path(cookies) if cookies else None)
session = cookie_jar.create_session()
tmp_dir = self._tmp_dir.new_subdir()
organizer = Organizer(self.resolve(to_path(target)))
crawler = IliasCrawler(base_url, session, authenticator, dir_filter)
downloader = IliasDownloader(tmp_dir, organizer, session, authenticator, download_strategy)
cookie_jar.load_cookies()
info = crawl_function(crawler)
cookie_jar.save_cookies()
transformed = apply_transform(transform, info)
if self._test_run:
self._print_transformables(transformed)
return organizer
downloader.download_all(transformed)
cookie_jar.save_cookies()
if clean:
organizer.cleanup()
return organizer
@swallow_and_print_errors
def ilias_kit(
self,
target: PathLike,
course_id: str,
dir_filter: IliasDirectoryFilter = lambda x, y: True,
transform: Transform = lambda x: x,
cookies: Optional[PathLike] = None,
username: Optional[str] = None,
password: Optional[str] = None,
download_strategy: IliasDownloadStrategy = download_modified_or_new,
clean: bool = True,
) -> Organizer:
"""
Synchronizes a folder with the ILIAS instance of the KIT.
Arguments:
target {Path} -- the target path to write the data to
course_id {str} -- the id of the main course page (found in the URL after ref_id
when opening the course homepage)
Keyword Arguments:
dir_filter {IliasDirectoryFilter} -- A filter for directories. Will be applied on the
crawler level, these directories and all of their content is skipped.
(default: {lambdax:True})
transform {Transform} -- A transformation function for the output paths. Return None
to ignore a file. (default: {lambdax:x})
cookies {Optional[Path]} -- The path to store and load cookies from.
(default: {None})
username {Optional[str]} -- The SCC username. If none is given, it will prompt
the user. (default: {None})
password {Optional[str]} -- The SCC password. If none is given, it will prompt
the user. (default: {None})
download_strategy {DownloadStrategy} -- A function to determine which files need to
be downloaded. Can save bandwidth and reduce the number of requests.
(default: {download_modified_or_new})
clean {bool} -- Whether to clean up when the method finishes.
"""
# This authenticator only works with the KIT ilias instance.
authenticator = KitShibbolethAuthenticator(username=username, password=password)
PRETTY.starting_synchronizer(target, "ILIAS", course_id)
return self._ilias(
target=target,
base_url="https://ilias.studium.kit.edu/",
crawl_function=lambda crawler: crawler.crawl_course(course_id),
authenticator=authenticator,
cookies=cookies,
dir_filter=dir_filter,
transform=transform,
download_strategy=download_strategy,
clean=clean,
)
@swallow_and_print_errors
def ilias_kit_personal_desktop(
self,
target: PathLike,
dir_filter: IliasDirectoryFilter = lambda x, y: True,
transform: Transform = lambda x: x,
cookies: Optional[PathLike] = None,
username: Optional[str] = None,
password: Optional[str] = None,
download_strategy: IliasDownloadStrategy = download_modified_or_new,
clean: bool = True,
) -> Organizer:
"""
Synchronizes a folder with the ILIAS instance of the KIT. This method will crawl the ILIAS
"personal desktop" instead of a single course.
Arguments:
target {Path} -- the target path to write the data to
Keyword Arguments:
dir_filter {IliasDirectoryFilter} -- A filter for directories. Will be applied on the
crawler level, these directories and all of their content is skipped.
(default: {lambdax:True})
transform {Transform} -- A transformation function for the output paths. Return None
to ignore a file. (default: {lambdax:x})
cookies {Optional[Path]} -- The path to store and load cookies from.
(default: {None})
username {Optional[str]} -- The SCC username. If none is given, it will prompt
the user. (default: {None})
password {Optional[str]} -- The SCC password. If none is given, it will prompt
the user. (default: {None})
download_strategy {DownloadStrategy} -- A function to determine which files need to
be downloaded. Can save bandwidth and reduce the number of requests.
(default: {download_modified_or_new})
clean {bool} -- Whether to clean up when the method finishes.
"""
# This authenticator only works with the KIT ilias instance.
authenticator = KitShibbolethAuthenticator(username=username, password=password)
PRETTY.starting_synchronizer(target, "ILIAS", "Personal Desktop")
return self._ilias(
target=target,
base_url="https://ilias.studium.kit.edu/",
crawl_function=lambda crawler: crawler.crawl_personal_desktop(),
authenticator=authenticator,
cookies=cookies,
dir_filter=dir_filter,
transform=transform,
download_strategy=download_strategy,
clean=clean,
)
@swallow_and_print_errors
def diva_kit(
self,
target: Union[PathLike, Organizer],
playlist_location: str,
transform: Transform = lambda x: x,
download_strategy: DivaDownloadStrategy = diva_download_new,
clean: bool = True
) -> Organizer:
"""
Synchronizes a folder with a DIVA playlist.
Arguments:
organizer {Organizer} -- The organizer to use.
playlist_location {str} -- the playlist id or the playlist URL
in the format 'https://mediaservice.bibliothek.kit.edu/#/details/DIVA-2019-271'
Keyword Arguments:
transform {Transform} -- A transformation function for the output paths. Return None
to ignore a file. (default: {lambdax:x})
download_strategy {DivaDownloadStrategy} -- A function to determine which files need to
be downloaded. Can save bandwidth and reduce the number of requests.
(default: {diva_download_new})
clean {bool} -- Whether to clean up when the method finishes.
"""
tmp_dir = self._tmp_dir.new_subdir()
if playlist_location.startswith("http"):
playlist_id = DivaPlaylistCrawler.fetch_id(playlist_link=playlist_location)
else:
playlist_id = playlist_location
if target is None:
PRETTY.starting_synchronizer("None", "DIVA", playlist_id)
raise FatalException("Got 'None' as target directory, aborting")
if isinstance(target, Organizer):
organizer = target
else:
organizer = Organizer(self.resolve(to_path(target)))
PRETTY.starting_synchronizer(organizer.path, "DIVA", playlist_id)
crawler = DivaPlaylistCrawler(playlist_id)
downloader = DivaDownloader(tmp_dir, organizer, download_strategy)
info = crawler.crawl()
transformed = apply_transform(transform, info)
if self._test_run:
self._print_transformables(transformed)
return organizer
downloader.download_all(transformed)
if clean:
organizer.cleanup()
return organizer

126
PFERD/progress.py Normal file
View File

@ -0,0 +1,126 @@
"""
A small progress bar implementation.
"""
import sys
from dataclasses import dataclass
from types import TracebackType
from typing import Optional, Type
import requests
from rich.console import Console, ConsoleOptions, Control, RenderResult
from rich.live_render import LiveRender
from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID,
TextColumn, TimeRemainingColumn,
TransferSpeedColumn)
_progress: Progress = Progress(
TextColumn("[bold blue]{task.fields[name]}", justify="right"),
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.1f}%",
"",
DownloadColumn(),
"",
TransferSpeedColumn(),
"",
TimeRemainingColumn(),
console=Console(file=sys.stdout)
)
def size_from_headers(response: requests.Response) -> Optional[int]:
"""
Return the size of the download based on the response headers.
Arguments:
response {requests.Response} -- the response
Returns:
Optional[int] -- the size
"""
if "Content-Length" in response.headers:
return int(response.headers["Content-Length"])
return None
@dataclass
class ProgressSettings:
"""
Settings you can pass to customize the progress bar.
"""
name: str
max_size: int
def progress_for(settings: Optional[ProgressSettings]) -> 'ProgressContextManager':
"""
Returns a context manager that displays progress
Returns:
ProgressContextManager -- the progress manager
"""
return ProgressContextManager(settings)
class _OneLineUp(LiveRender):
"""
Render a control code for moving one line upwards.
"""
def __init__(self) -> None:
super().__init__("not rendered")
def __console__(self, console: Console, options: ConsoleOptions) -> RenderResult:
yield Control(f"\r\x1b[1A")
class ProgressContextManager:
"""
A context manager used for displaying progress.
"""
def __init__(self, settings: Optional[ProgressSettings]):
self._settings = settings
self._task_id: Optional[TaskID] = None
def __enter__(self) -> 'ProgressContextManager':
"""Context manager entry function."""
if not self._settings:
return self
_progress.start()
self._task_id = _progress.add_task(
self._settings.name,
total=self._settings.max_size,
name=self._settings.name
)
return self
# pylint: disable=useless-return
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
"""Context manager exit function. Removes the task."""
if self._task_id is None:
return None
_progress.remove_task(self._task_id)
if len(_progress.task_ids) == 0:
# We need to clean up after ourselves, as we were the last one
_progress.stop()
_progress.refresh()
# And we existed, so remove the line above (remove_task leaves one behind)
Console().print(_OneLineUp())
return None
def advance(self, amount: float) -> None:
"""
Advances the progress bar.
"""
if self._task_id is not None:
_progress.advance(self._task_id, amount)

View File

@ -1,111 +0,0 @@
# Fakultät für Mathematik (FfM)
import getpass
import logging
import pathlib
import re
import bs4
import requests
from .organizer import Organizer
from .utils import stream_to_path, PrettyLogger
__all__ = ["Ti"]
logger = logging.getLogger(__name__)
pretty = PrettyLogger(logger)
class Ti:
BASE_URL = "http://ti.ira.uka.de/"
FILE_RE = re.compile(r"^.+\.pdf$")
def __init__(self, base_path):
self.base_path = base_path
self._session = requests.Session()
self._credentials = None
def synchronize(self, urlpart, to_dir, transform=lambda x: x,
filter=lambda x: True):
pretty.starting_synchronizer(to_dir, "Ti", urlpart)
sync_path = pathlib.Path(self.base_path, to_dir)
orga = Organizer(self.base_path, sync_path)
orga.clean_temp_dir()
self._reset_credentials()
available = self._find_available(urlpart)
for name, address in sorted(available.items()):
path = pathlib.PurePath(name)
if filter(path):
self._crawl(urlpart + address, path, orga, transform)
else:
loggwe.info(f"Skipping {name}/")
orga.clean_sync_dir()
orga.clean_temp_dir()
self._reset_credentials()
def _find_available(self, urlpart):
url = self.BASE_URL + urlpart
r = self._session.get(url)
soup = bs4.BeautifulSoup(r.text, "html.parser")
available = {}
if soup.find(href="./Vorlesung/Vorlesung.php"):
logger.info("Found Folien/")
available["Folien"] = "/Vorlesung/"
if soup.find(href="./Uebungen/Uebungen.php"):
logger.info("Found Blätter/")
available["Blätter"] = "/Uebungen/"
if soup.find(href="./Tutorien/Tutorien.php"):
logger.info("Found Tutorien/")
available["Tutorien"] = "/Tutorien/"
return available
def _crawl(self, urlpart, path, orga, transform):
url = self.BASE_URL + urlpart
r = self._session.get(url)
soup = bs4.BeautifulSoup(r.text, "html.parser")
for filelink in soup.find_all("a", href=self.FILE_RE):
filepath = path / filelink["href"]
fileurl = url + "/" + filelink["href"]
new_path = transform(filepath)
if new_path is None:
continue
logger.debug(f"Transformed from {filepath} to {new_path}")
temp_path = orga.temp_file()
self._download(fileurl, temp_path)
orga.add_file(temp_path, new_path)
def _get_credentials(self):
if self._credentials is None:
print("Please enter Ti credentials.")
username = getpass.getpass(prompt="Username: ")
password = getpass.getpass(prompt="Password: ")
self._credentials = (username, password)
return self._credentials
def _reset_credentials(self):
self._credentials = None
def _download(self, url, to_path):
while True:
username, password = self._get_credentials()
with self._session.get(url, stream=True, auth=(username, password)) as r:
if r.ok:
stream_to_path(r, to_path)
return
else:
print("Incorrect credentials.")
self._reset_credentials()

79
PFERD/tmp_dir.py Normal file
View File

@ -0,0 +1,79 @@
"""Helper functions and classes for temporary folders."""
import logging
import shutil
from pathlib import Path
from types import TracebackType
from typing import Optional, Type
from .location import Location
LOGGER = logging.getLogger(__name__)
class TmpDir(Location):
"""A temporary folder that can create files or nested temp folders."""
def __init__(self, path: Path):
"""Create a new temporary folder for the given path."""
super().__init__(path)
self._counter = 0
self.cleanup()
self.path.mkdir(parents=True, exist_ok=True)
def __str__(self) -> str:
"""Format the folder as a string."""
return f"Folder at {self.path}"
def __enter__(self) -> 'TmpDir':
"""Context manager entry function."""
return self
# pylint: disable=useless-return
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
"""Context manager exit function. Calls cleanup()."""
self.cleanup()
return None
def new_path(self, prefix: Optional[str] = None) -> Path:
"""
Return a unique path inside the directory. Doesn't create a file or
directory.
"""
name = f"{prefix if prefix else 'tmp'}-{self._inc_and_get_counter():03}"
LOGGER.debug("Creating temp file %s", name)
return self.resolve(Path(name))
def new_subdir(self, prefix: Optional[str] = None) -> 'TmpDir':
"""
Create a new nested temporary folder and return it.
"""
name = f"{prefix if prefix else 'tmp'}-{self._inc_and_get_counter():03}"
sub_path = self.resolve(Path(name))
sub_path.mkdir(parents=True)
LOGGER.debug("Creating temp dir %s at %s", name, sub_path)
return TmpDir(sub_path)
def cleanup(self) -> None:
"""Delete this folder and all contained files."""
LOGGER.debug("Deleting temp folder %s", self.path)
if self.path.resolve().exists():
shutil.rmtree(self.path.resolve())
def _inc_and_get_counter(self) -> int:
"""Get and increment the counter by one."""
counter = self._counter
self._counter += 1
return counter

127
PFERD/transform.py Normal file
View File

@ -0,0 +1,127 @@
"""
Transforms let the user define functions to decide where the downloaded files
should be placed locally. They let the user do more advanced things like moving
only files whose names match a regex, or renaming files from one numbering
scheme to another.
"""
from dataclasses import dataclass
from pathlib import PurePath
from typing import Callable, List, Optional, TypeVar
from .utils import PathLike, Regex, to_path, to_pattern
Transform = Callable[[PurePath], Optional[PurePath]]
@dataclass
class Transformable:
"""
An object that can be transformed by a Transform.
"""
path: PurePath
TF = TypeVar("TF", bound=Transformable)
def apply_transform(
transform: Transform,
transformables: List[TF],
) -> List[TF]:
"""
Apply a Transform to multiple Transformables, discarding those that were
not transformed by the Transform.
"""
result: List[TF] = []
for transformable in transformables:
new_path = transform(transformable.path)
if new_path:
transformable.path = new_path
result.append(transformable)
return result
# Transform combinators
keep = lambda path: path
def attempt(*args: Transform) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
for transform in args:
result = transform(path)
if result:
return result
return None
return inner
def optionally(transform: Transform) -> Transform:
return attempt(transform, lambda path: path)
def do(*args: Transform) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
current = path
for transform in args:
result = transform(current)
if result:
current = result
else:
return None
return current
return inner
def predicate(pred: Callable[[PurePath], bool]) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
if pred(path):
return path
return None
return inner
def glob(pattern: str) -> Transform:
return predicate(lambda path: path.match(pattern))
def move_dir(source_dir: PathLike, target_dir: PathLike) -> Transform:
source_path = to_path(source_dir)
target_path = to_path(target_dir)
def inner(path: PurePath) -> Optional[PurePath]:
if source_path in path.parents:
return target_path / path.relative_to(source_path)
return None
return inner
def move(source: PathLike, target: PathLike) -> Transform:
source_path = to_path(source)
target_path = to_path(target)
def inner(path: PurePath) -> Optional[PurePath]:
if path == source_path:
return target_path
return None
return inner
def rename(source: str, target: str) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
if path.name == source:
return path.with_name(target)
return None
return inner
def re_move(regex: Regex, target: str) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
match = to_pattern(regex).fullmatch(str(path))
if match:
groups = [match.group(0)]
groups.extend(match.groups())
return PurePath(target.format(*groups))
return None
return inner
def re_rename(regex: Regex, target: str) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
match = to_pattern(regex).fullmatch(path.name)
if match:
groups = [match.group(0)]
groups.extend(match.groups())
return path.with_name(target.format(*groups))
return None
return inner

View File

@ -1,64 +1,98 @@
import os
import sys
import pathlib
from colorama import Style
from colorama import Fore
"""
A few utility bobs and bits.
"""
__all__ = [
"get_base_dir",
"move",
"rename",
"stream_to_path",
"ContentTypeException",
"FileNotFoundException",
"PrettyLogger",
]
import re
from pathlib import Path, PurePath
from typing import Optional, Tuple, Union
def get_base_dir(script_file):
return pathlib.Path(os.path.dirname(os.path.abspath(script_file)))
import bs4
import requests
def move(path, from_folders, to_folders):
l = len(from_folders)
if path.parts[:l] == from_folders:
return pathlib.PurePath(*to_folders, *path.parts[l:])
from .progress import ProgressSettings, progress_for, size_from_headers
def rename(path, to_name):
return pathlib.PurePath(*path.parts[:-1], to_name)
PathLike = Union[PurePath, str, Tuple[str, ...]]
def stream_to_path(response, to_path, chunk_size=1024**2):
with open(to_path, 'wb') as fd:
for chunk in response.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def isOutputPipe():
"""Returns whether this program's output is attached to a pipe.
def to_path(pathlike: PathLike) -> Path:
"""
return sys.stdout.isatty
Convert a given PathLike into a Path.
"""
if isinstance(pathlike, tuple):
return Path(*pathlike)
return Path(pathlike)
class ContentTypeException(Exception):
pass
class FileNotFoundException(Exception):
pass
Regex = Union[str, re.Pattern]
class PrettyLogger:
def __init__(self, logger):
self.logger = logger
def to_pattern(regex: Regex) -> re.Pattern:
"""
Convert a regex to a re.Pattern.
"""
if isinstance(regex, re.Pattern):
return regex
return re.compile(regex)
def modified_file(self, file_name):
self.logger.info(f"{Fore.MAGENTA}{Style.BRIGHT}Modified {file_name}.{Style.RESET_ALL}")
def new_file(self, file_name):
self.logger.info(f"{Fore.GREEN}{Style.BRIGHT}Created {file_name}.{Style.RESET_ALL}")
def soupify(response: requests.Response) -> bs4.BeautifulSoup:
"""
Wrap a requests response in a bs4 object.
"""
def ignored_file(self, file_name):
self.logger.info(f"{Style.DIM}Ignored {file_name}.{Style.RESET_ALL}")
return bs4.BeautifulSoup(response.text, "html.parser")
def starting_synchronizer(self, target_directory, synchronizer_name, subject=None):
subject_str = f"{subject} " if subject else ""
self.logger.info("")
self.logger.info((
f"{Fore.CYAN}{Style.BRIGHT}Synchronizing {subject_str}to {target_directory}"
f" using the {synchronizer_name} synchronizer.{Style.RESET_ALL}"
))
def stream_to_path(
response: requests.Response,
target: Path,
progress_name: Optional[str] = None,
chunk_size: int = 1024 ** 2
) -> None:
"""
Download a requests response content to a file by streaming it. This
function avoids excessive memory usage when downloading large files. The
chunk_size is in bytes.
If progress_name is None, no progress bar will be shown. Otherwise a progress
bar will appear, if the download is bigger than an internal threshold.
"""
with response:
length = size_from_headers(response)
if progress_name and length and int(length) > 1024 * 1024 * 10: # 10 MiB
settings: Optional[ProgressSettings] = ProgressSettings(progress_name, length)
else:
settings = None
with open(target, 'wb') as file_descriptor:
with progress_for(settings) as progress:
for chunk in response.iter_content(chunk_size=chunk_size):
file_descriptor.write(chunk)
progress.advance(len(chunk))
def prompt_yes_no(question: str, default: Optional[bool] = None) -> bool:
"""
Prompts the user a yes/no question and returns their choice.
"""
if default is True:
prompt = "[Y/n]"
elif default is False:
prompt = "[y/N]"
else:
prompt = "[y/n]"
text = f"{question} {prompt} "
wrong_reply = "Please reply with 'yes'/'y' or 'no'/'n'."
while True:
response = input(text).strip().lower()
if response in {"yes", "ye", "y"}:
return True
if response in {"no", "n"}:
return False
if response == "" and default is not None:
return default
print(wrong_reply)

137
README.md
View File

@ -4,29 +4,33 @@
## Installation
Ensure that you have at least Python 3.7 installed (3.6 might also work, didn't
test it though).
Ensure that you have at least Python 3.8 installed.
To install PFERD or update your installation to the latest version, run this
wherever you want to install/have installed PFERD:
```
$ pip install git+https://github.com/Garmelon/PFERD@v1.1.4
$ pip install git+https://github.com/Garmelon/PFERD@v2.0.0
```
The use of [venv](https://docs.python.org/3/library/venv.html) is recommended.
## Example setup
In this example, `python3` refers to at least Python 3.7.
In this example, `python3` refers to at least Python 3.8.
If you just want to get started and crawl *your entire ILIAS Desktop* instead
of a given set of courses, please replace `example_config.py` with
`example_config_personal_desktop.py` in all of the instructions below (`curl` call and
`python3` run command).
A full example setup and initial use could look like:
```
$ mkdir Vorlesungen
$ cd Vorlesungen
$ python3 -m venv .
$ . bin/activate
$ pip install git+https://github.com/Garmelon/PFERD@v1.1.4
$ curl -O https://raw.githubusercontent.com/Garmelon/PFERD/master/example_config.py
$ python3 -m venv .venv
$ .venv/bin/activate
$ pip install git+https://github.com/Garmelon/PFERD@v2.0.0
$ curl -O https://raw.githubusercontent.com/Garmelon/PFERD/v2.0.0/example_config.py
$ python3 example_config.py
$ deactivate
```
@ -34,7 +38,122 @@ $ deactivate
Subsequent runs of the program might look like:
```
$ cd Vorlesungen
$ . bin/activate
$ .venv/bin/activate
$ python3 example_config.py
$ deactivate
```
## Usage
A PFERD config is a normal python file that starts multiple *synchronizers*
which do all the heavy lifting. While you can create and wire them up manually,
you are encouraged to use the helper methods provided in `PFERD.Pferd`.
The synchronizers take some input arguments specific to their service and a
*transformer*. The transformer receives the computed path of an element in
ILIAS and can return either an output path (so you can rename files or move
them around as you wish) or `None` if you do not want to save the given file.
Additionally the ILIAS synchronizer allows you to define a *crawl filter*. This
filter also receives the computed path as the input, but is only called or
*directoties*. If you return `True`, the directory will be crawled and
searched. If you return `False` the directory will be ignored and nothing in it
will be passed to the transformer.
In order to help you with writing your own transformers and filters, PFERD
ships with a few powerful building blocks:
| Method | Description |
|--------|-------------|
| `glob` | Returns a transform that returns `None` if the glob does not match and the unmodified path otherwise. |
| `predicate` | Returns a transform that returns `None` if the predicate does not match the path and the unmodified path otherwise. |
| `move_dir(source, target)` | Returns a transform that moves all files from the `source` to the `target` dir. |
| `move(source, target)` | Returns a transform that moves the `source` file to `target`. |
| `rename(old, new)` | Renames a single file. |
| `re_move(regex, sub)` | Moves all files matching the given regular expression. The different captured groups are available under their index and can be used together with normal python format methods: `re_move(r"Blatt (\d+)\.pdf", "Blätter/Blatt_{1:0>2}.pdf"),`. |
| `re_rename(old, new)` | Same as `re_move` but operates on the path *names* instead of the full path. |
And PFERD also offers a few combinator functions:
* **`keep`**
`keep` just returns the input path unchanged. It can be very useful as the
last argument in an `attempt` call, to leave everything not matching a rule
unchanged.
* **`optionally(transformer)`**
Wraps a given transformer and returns its result if it is not `None`.
Otherwise returns the input path unchanged.
* **`do(transformers)`**
`do` accepts a series of transformers and applies them in the given order to
the result of the previous one. If any transformer returns `None`, do
short-circuits and also returns `None`. This can be used to perform multiple
renames in a row:
```py
do(
# Move them
move_dir("Vorlesungsmaterial/Vorlesungsvideos/", "Vorlesung/Videos/"),
# Fix extensions (if they have any)
optionally(re_rename("(.*).m4v.mp4", "{1}.mp4")),
# Remove the 'dbs' prefix (if they have any)
optionally(re_rename("(?i)dbs-(.+)", "{1}")),
),
```
* **`attempt(transformers)`**
`attempt` applies the passed transformers in the given order until it finds
one that does not return `None`. If it does not find any, it returns `None`.
This can be used to give a list of possible transformations and it will
automatically pick the first one that fits:
```py
attempt(
# Move all videos. If a video is passed in, this `re_move` will succeed
# and attempt short-circuits with the result.
re_move(r"Vorlesungsmaterial/.*/(.+?)\.mp4", "Vorlesung/Videos/{1}.mp4"),
# Move the whole folder to a nicer name - now without any mp4!
move_dir("Vorlesungsmaterial/", "Vorlesung/"),
# If we got another file, keep it.
keep,
)
```
All of these combinators are used in the provided example config, if you want
to see some more true-to-live usages.
### A short, but commented example
```py
def filter_course(path: PurePath) -> bool:
# Note that glob returns a Transformer
# - a function from PurePath -> Optional[PurePath]
# So we need to apply the result of 'glob' to our input path.
# We need to crawl the 'Tutorien' folder as it contains the one we want.
if glob("Tutorien/")(path):
return True
# If we found 'Tutorium 10', keep it!
if glob("Tutorien/Tutorium 10")(path):
return True
# Discard all other folders inside 'Tutorien'
if glob("Tutorien/*")(path):
return False
# All other dirs (including subdirs of 'Tutorium 10') should be searched :)
return True
enable_logging() # needed once before calling a Pferd method
# Create a Pferd instance rooted in the same directory as the script file
# This is not a test run, so files will be downloaded (default, can be omitted)
pferd = Pferd(Path(__file__).parent, test_run=False)
# Use the ilias_kit helper to synchronize an ILIAS course
pferd.ilias_kit(
# The folder all of the course's content should be placed in
Path("My cool course"),
# The course ID (found in the URL when on the course page in ILIAS)
"course id",
# A path to a cookie jar. If you synchronize multiple ILIAS courses, setting this
# to a common value requires you to only login once.
cookies=Path("ilias_cookies.txt"),
# A transform to apply to all found paths
transform=transform_course,
# A crawl filter limits what paths the cralwer searches
dir_filter=filter_course,
)
```

View File

@ -1,342 +1,129 @@
#!/bin/env python3
import argparse
from pathlib import Path, PurePath
import re
import sys
from PFERD import Pferd
from PFERD.ilias import IliasElementType
from PFERD.transform import (attempt, do, glob, keep, move, move_dir,
optionally, re_move, re_rename)
import PFERD
from PFERD.utils import get_base_dir, move, rename
tf_ss_2020_numerik = attempt(
re_move(r"Übungsblätter/(\d+)\. Übungsblatt/.*", "Blätter/Blatt_{1:0>2}.pdf"),
keep,
)
#PFERD.enable_logging(logging.DEBUG)
PFERD.enable_logging()
base_dir = get_base_dir(__file__)
tf_ss_2020_db = attempt(
move_dir("Begrüßungsvideo/", "Vorlesung/Videos/"),
do(
move_dir("Vorlesungsmaterial/Vorlesungsvideos/", "Vorlesung/Videos/"),
optionally(re_rename("(.*).m4v.mp4", "{1}.mp4")),
optionally(re_rename("(?i)dbs-(.+)", "{1}")),
),
move_dir("Vorlesungsmaterial/", "Vorlesung/"),
keep,
)
# Semester 1
def gbi_filter(path):
# Tutorien rausfiltern
if path.parts[:1] == ("Tutoriumsfolien",):
if path.parts[1:] == (): return True
if path.parts[1:2] == ("Tutorium 15",): return True
tf_ss_2020_rechnernetze = attempt(
re_move(r"Vorlesungsmaterial/.*/(.+?)\.mp4", "Vorlesung/Videos/{1}.mp4"),
move_dir("Vorlesungsmaterial/", "Vorlesung/"),
keep,
)
tf_ss_2020_sicherheit = attempt(
move_dir("Vorlesungsvideos/", "Vorlesung/Videos/"),
move_dir("Übungsvideos/", "Übung/Videos/"),
re_move(r"VL(.*)\.pdf", "Vorlesung/{1}.pdf"),
re_move(r"Übungsblatt (\d+)\.pdf", "Blätter/Blatt_{1:0>2}.pdf"),
move("Chiffrat.txt", "Blätter/Blatt_01_Chiffrat.txt"),
keep,
)
tf_ss_2020_pg = attempt(
move_dir("Vorlesungsaufzeichnungen/", "Vorlesung/Videos/"),
move_dir("Vorlesungsmaterial/", "Vorlesung/"),
re_move(r"Übungen/uebungsblatt(\d+).pdf", "Blätter/Blatt_{1:0>2}.pdf"),
keep,
)
def df_ss_2020_or1(path: PurePath, _type: IliasElementType) -> bool:
if glob("Tutorien/")(path):
return True
if glob("Tutorien/Tutorium 10, dienstags 15:45 Uhr/")(path):
return True
if glob("Tutorien/*")(path):
return False
return True
def gbi_transform(path):
# Übungsblätter in Blätter/blatt_xx.pdf
new_path = move(path, ("Übungsblätter",), ("Blätter",))
if new_path is not None:
match = re.match(r"(\d+).aufgaben.pdf", new_path.name)
if match:
number = int(match.group(1))
return rename(new_path, f"blatt_{number:02}.pdf")
tf_ss_2020_or1 = attempt(
move_dir("Vorlesung/Unbeschriebene Folien/", "Vorlesung/Folien/"),
move_dir("Video zur Organisation/", "Vorlesung/Videos/"),
keep,
)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--test-run", action="store_true")
parser.add_argument("synchronizers", nargs="*")
args = parser.parse_args()
pferd = Pferd(Path(__file__).parent, test_run=args.test_run)
pferd.enable_logging()
if not args.synchronizers or "numerik" in args.synchronizers:
pferd.ilias_kit(
target="Numerik",
course_id="1083036",
transform=tf_ss_2020_numerik,
cookies="ilias_cookies.txt",
)
if not args.synchronizers or "db" in args.synchronizers:
pferd.ilias_kit(
target="DB",
course_id="1101554",
transform=tf_ss_2020_db,
cookies="ilias_cookies.txt",
)
if not args.synchronizers or "rechnernetze" in args.synchronizers:
pferd.ilias_kit(
target="Rechnernetze",
course_id="1099996",
transform=tf_ss_2020_rechnernetze,
cookies="ilias_cookies.txt",
)
if not args.synchronizers or "sicherheit" in args.synchronizers:
pferd.ilias_kit(
target="Sicherheit",
course_id="1101980",
transform=tf_ss_2020_sicherheit,
cookies="ilias_cookies.txt",
)
if not args.synchronizers or "pg" in args.synchronizers:
pferd.ilias_kit(
target="PG",
course_id="1106095",
transform=tf_ss_2020_pg,
cookies="ilias_cookies.txt",
)
if not args.synchronizers or "or1" in args.synchronizers:
pferd.ilias_kit(
target="OR1",
course_id="1105941",
dir_filter=df_ss_2020_or1,
transform=tf_ss_2020_or1,
cookies="ilias_cookies.txt",
)
match = re.match(r"(\d+).loesungen.pdf", new_path.name)
if match:
number = int(match.group(1))
return rename(new_path, f"loesung_{number:02}.pdf")
return new_path
# Folien in Folien/*
new_path = move(path, ("Vorlesung: Folien",), ("Folien",))
if new_path is not None: return new_path
# Skripte in Skripte/*
new_path = move(path, ("Vorlesung: Skript",), ("Skripte",))
if new_path is not None:
if new_path.name == "k-21-relationen-skript.pdf":
return rename(new_path, "21-relationen-skript.pdf")
return new_path
# Übungsfolien in Übung/*
new_path = move(path, ("große Übung: Folien",), ("Übung",))
if new_path is not None: return new_path
# Tutoriumsfolien in Tutorium/*
new_path = move(path, ("Tutoriumsfolien","Tutorium 15"), ("Tutorium",))
if new_path is not None:
if new_path.name == "GBI_Tut_2 (1).pdf":
return rename(new_path, "GBI_Tut_2.pdf")
if new_path.name == "GBI_Tut_7 (1).pdf":
return rename(new_path, "GBI_Tut_7.pdf")
return new_path
return path
def hm1_transform(path):
match = re.match(r"blatt(\d+).pdf", path.name)
if match:
new_path = move(path, (), ("Blätter",))
number = int(match.group(1))
return rename(new_path, f"blatt_{number:02}.pdf")
match = re.match(r"blatt(\d+).loesungen.pdf", path.name)
if match:
new_path = move(path, (), ("Blätter",))
number = int(match.group(1))
return rename(new_path, f"loesung_{number:02}.pdf")
return path
def la1_filter(path):
# Tutorien rausfitern
if path.parts[:1] == ("Tutorien",):
if path.parts[1:] == (): return True
if path.parts[1:2] == ("Tutorium 03 - Philipp Faller",): return True
if path.parts[1:2] == ("Tutorium 23 - Sebastian Faller",): return True
return False
return True
def la1_transform(path):
# Alle Übungsblätter in Blätter/blatt_xx.pdf
# Alles andere Übungsmaterial in Blätter/*
new_path = move(path, ("Übungen",), ("Blätter",))
if new_path is not None:
match = re.match(r"Blatt(\d+).pdf", new_path.name)
if match:
number = int(match.group(1))
return rename(new_path, f"blatt_{number:02}.pdf")
if new_path.name == "Lösungen zu Blatt 1, Aufgabe 1 und Blatt 2, Aufgabe 4..pdf":
return rename(new_path, "Lösungen zu Blatt 1, Aufgabe 1 und Blatt 2, Aufgabe 4.pdf")
return new_path
# Alles Tutoriengedöns von Philipp in Tutorium/Philipp/*
new_path = move(path, ("Tutorien","Tutorium 03 - Philipp Faller"), ("Tutorium","Philipp"))
if new_path is not None:
if new_path.name == "tut2.pdf":
return rename(new_path, "Tut2.pdf")
return new_path
# Alles Tutoriengedöns von Sebastian in Tutorium/Sebastian/*
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 1"), ("Tutorium","Sebastian", "tut01"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 2", "aufgaben.pdf"), ("Tutorium","Sebastian", "tut02.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 3", "aufgaben.pdf"), ("Tutorium","Sebastian", "tut03.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 4", "aufgaben.pdf"), ("Tutorium","Sebastian", "tut04.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 5", "aufgaben.pdf"), ("Tutorium","Sebastian", "tut05.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 6", "aufgaben.pdf"), ("Tutorium","Sebastian", "tut06.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 7", "tut7.pdf"), ("Tutorium","Sebastian", "tut07.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 8", "tut8.pdf"), ("Tutorium","Sebastian", "tut08.pdf"))
if new_path is not None: return new_path
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 9", "tut9.pdf"), ("Tutorium","Sebastian", "tut09.pdf"))
if new_path is not None: return new_path
if path.parts == ("Tutorien","Tutorium 23 - Sebastian Faller", "Tutorium 10", "tut10.pdf"): return None
new_path = move(path, ("Tutorien","Tutorium 23 - Sebastian Faller"), ("Tutorium","Sebastian"))
if new_path is not None:
return new_path
# Übungs-Gedöns in Übung/*
new_path = move(path, ("Informatikervorlesung", "Übungsfolien"), ("Übung",))
if new_path is not None:
if new_path.name == "Übung_06_ausgewählte Folien.pdf":
return rename(new_path, "Übung_06_ausgewählte_Folien.pdf")
return new_path
# Vorlesungsfolien-Gedöns in Folien/*
new_path = move(path, ("Informatikervorlesung", "Folien.Notizen"), ("Folien",))
if new_path is not None:
return new_path
# Rest in Hauptverzeichnis
new_path = move(path, ("Informatikervorlesung",), ())
if new_path is not None:
# Rename filenames that are invalid on FAT systems
if new_path.name == "Evaluationsergebnisse: Übung.pdf":
return rename(new_path, "Evaluationsergebnisse_Übung.pdf")
if new_path.name == "Skript \"Lineare Algebra\" von Stefan Kühnlein.pdf":
return rename(new_path, "Skript Lineare Algebra von Stefan kühnlein.pdf")
return new_path
return path
def prog_filter(path):
# Tutorien rausfiltern
if path.parts[:1] == ("Tutorien",): return False
return True
def prog_transform(path):
# Übungsblätter in Blätter/*
new_path = move(path, ("Übungen",), ("Blätter",))
if new_path is not None:
if new_path.name == "assignmen04.pdf":
return rename(new_path, "assignment04.pdf")
return new_path
# Folien in Folien/*
new_path = move(path, ("Vorlesungsmaterial",), ("Folien",))
if new_path is not None:
if new_path.name == "00.1_Begruessung.pdf":
return rename(new_path, "00-01_Begruessung.pdf")
if new_path.name == "00.2_Organisatorisches.pdf":
return rename(new_path, "00-02_Organisatorisches.pdf")
if new_path.name == "01-01_ Einfache-Programme.pdf":
return rename(new_path, "01-01_Einfache_Programme.pdf")
if new_path.name == "13_Finden_und_ Beheben_von_Fehlern.pdf":
return rename(new_path, "13_Finden_und_Beheben_von_Fehlern.pdf")
return new_path
return path
# Semester 2
def algo1_filter(path):
# Tutorien rausfiltern
if path.parts[:1] == ("Tutorien",):
if path.parts[1:] == (): return True
#if path.parts[1:2] == ("Tutorium 15",): return True
return False
return True
def algo1_transform(path):
# Folien in Folien/*
new_path = move(path, ("Vorlesungsfolien",), ("Folien",))
if new_path is not None:
return new_path
return path
def hm2_transform(path):
match = re.match(r"blatt(\d+).pdf", path.name)
if match:
new_path = move(path, (), ("Blätter",))
number = int(match.group(1))
return rename(new_path, f"blatt_{number:02}.pdf")
match = re.match(r"blatt(\d+).loesungen.pdf", path.name)
if match:
new_path = move(path, (), ("Blätter",))
number = int(match.group(1))
return rename(new_path, f"loesung_{number:02}.pdf")
return path
def la2_filter(path):
# Tutorien rausfiltern
if path.parts[:1] == ("Tutorien",):
if path.parts[1:] == (): return True
#if path.parts[1:2] == ("Tutorium 15",): return True
return False
return True
def la2_transform(path):
# Folien in Folien/*
new_path = move(path, ("Vorlesungsmaterial",), ("Folien",))
if new_path is not None: return new_path
# Alle Übungsblätter in Blätter/blatt_xx.pdf
# Alles andere Übungsmaterial in Blätter/*
new_path = move(path, ("Übungen",), ("Blätter",))
if new_path is not None:
match = re.match(r"Blatt(\d+).pdf", new_path.name)
if match:
number = int(match.group(1))
return rename(new_path, f"blatt_{number:02}.pdf")
return new_path
return path
def swt1_filter(path):
# Tutorien rausfiltern
if path.parts[:1] == ("Tutorien",):
if path.parts[1:] == (): return True
#if path.parts[1:2] == ("Tutorium 15",): return True
return False
return True
def swt1_transform(path):
# Folien in Folien/*
new_path = move(path, ("Vorlesungsmaterial",), ("Folien",))
if new_path is not None: return new_path
# Übungsblätter in Blätter/*
new_path = move(path, ("Übungen",), ("Blätter",))
if new_path is not None: return new_path
return path
# Main part of the config
def main(args):
args = [arg.lower() for arg in args]
ffm = PFERD.FfM(base_dir)
ilias = PFERD.Ilias(base_dir, "cookie_jar")
norbert = PFERD.Norbert(base_dir)
# Semester 1
if not args or "gbi" in args:
ilias.synchronize("855240", "GBI",
transform=gbi_transform, filter=gbi_filter)
if not args or "hm1" in args:
ffm.synchronize("iana2/lehre/hm1info2018w", "HM1",
transform=hm1_transform)
if not args or "la1" in args:
ilias.synchronize("874938", "LA1",
transform=la1_transform, filter=la1_filter)
if not args or "prog" in args:
ilias.synchronize("851237", "Prog",
transform=prog_transform, filter=prog_filter)
if not args or "norbert" in args:
norbert.synchronize("Prog-Tut")
# Semester 2
if not args or "algo1" in args:
ilias.synchronize("959260", "Algo1",
transform=algo1_transform, filter=algo1_filter)
if not args or "hm2" in args:
ffm.synchronize("iana2/lehre/hm2info2019s", "HM2",
transform=hm2_transform)
if not args or "la2" in args:
ilias.synchronize("950588", "LA2",
transform=la2_transform, filter=la2_filter)
if not args or "swt1" in args:
ilias.synchronize("945596", "SWT1",
transform=swt1_transform, filter=swt1_filter)
if __name__ == "__main__":
args = sys.argv[1:]
main(args)
main()

View File

@ -0,0 +1,35 @@
"""
This is a small config that just crawls the ILIAS Personal Desktop.
It does not filter or rename anything, it just gobbles up everything it can find.
Note that this still includes a test-run switch, so you can see what it *would* download.
You can enable that with the "--test-run" command line switch,
i. e. "python3 example_config_minimal.py --test-run".
"""
import argparse
from pathlib import Path
from PFERD import Pferd
def main() -> None:
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument("--test-run", action="store_true")
args = parser.parse_args()
# Create the Pferd helper instance
pferd = Pferd(Path(__file__).parent, test_run=args.test_run)
pferd.enable_logging()
# Synchronize the personal desktop into the "ILIAS" directory.
# It saves the cookies, so you only need to log in again when the ILIAS cookies expire.
pferd.ilias_kit_personal_desktop(
"ILIAS",
cookies="ilias_cookies.txt",
)
if __name__ == "__main__":
main()

7
mypy.ini Normal file
View File

@ -0,0 +1,7 @@
[mypy]
disallow_untyped_defs = True
disallow_incomplete_defs = True
no_implicit_optional = True
[mypy-rich.*,bs4]
ignore_missing_imports = True

View File

@ -1,14 +1,14 @@
from setuptools import setup
from setuptools import find_packages, setup
setup(
name="PFERD",
version="1.1.4",
packages=["PFERD"],
install_requires=[
"requests>=2.21.0",
"beautifulsoup4>=4.7.1",
"colorama>=0.4.1"
],
name="PFERD",
version="2.1.0",
packages=find_packages(),
install_requires=[
"requests>=2.21.0",
"beautifulsoup4>=4.7.1",
"rich>=1.0.0"
],
)
# When updating the version, also: