mirror of
https://github.com/Garmelon/PFERD.git
synced 2023-12-21 10:23:01 +01:00
Compare commits
84 Commits
Author | SHA1 | Date | |
---|---|---|---|
cccd68e04a | |||
2bd40a5f30 | |||
2ca1101326 | |||
c1ab7485e2 | |||
29cd5d1a3c | |||
6d5d9333ad | |||
7cc40595dc | |||
80ae5ddfaa | |||
4f480d117e | |||
1f2af3a290 | |||
14cdfb6a69 | |||
e2bf84392b | |||
946b7a7931 | |||
9a9018751e | |||
83b75e8254 | |||
35c3fa205d | |||
0b606f02fa | |||
fb78a6e98e | |||
5de68a0400 | |||
f0562049b6 | |||
0e1077bb50 | |||
c978e9edf4 | |||
2714ac6be6 | |||
9b048a9cfc | |||
1c2b6bf994 | |||
ee39aaf08b | |||
93e6329901 | |||
f47b137b59 | |||
83ea15ee83 | |||
75471c46d1 | |||
1e0343bba6 | |||
0f5e55648b | |||
57259e21f4 | |||
4ce385b262 | |||
2d64409542 | |||
fcb3884a8f | |||
9f6dc56a7b | |||
56ab473611 | |||
6426060804 | |||
49a0ca7a7c | |||
f3a4663491 | |||
ecdbca8fb6 | |||
9cbea5fe06 | |||
ba3c7f85fa | |||
ba9215ebe8 | |||
8ebf0eab16 | |||
cd90a60dee | |||
98834c9c95 | |||
55e9e719ad | |||
a0ae9aee27 | |||
1486a63854 | |||
733e1ae136 | |||
4ac51048c1 | |||
f2aba970fd | |||
9c4759103a | |||
316b9d7bf4 | |||
6f30adcd22 | |||
6f78fef604 | |||
f830b42a36 | |||
ef343dec7c | |||
0da2fafcd8 | |||
f4abe3197c | |||
38d4f5b4c9 | |||
9ea03bda3e | |||
07de5bea8b | |||
f0d572c110 | |||
076067e22d | |||
ebb6e63c5c | |||
0c3f35a2d2 | |||
521890ae78 | |||
3f7c73df80 | |||
43100f69d5 | |||
d73c778b0a | |||
73c3eb0984 | |||
a519cbe05d | |||
b3ad9783c4 | |||
c1ccb6c53e | |||
51a713fa04 | |||
74ea039458 | |||
aaa6a2b6a4 | |||
e32a49480b | |||
be65051f9d | |||
3387bc5f20 | |||
3f0ae729d6 |
74
.github/workflows/package.yml
vendored
Normal file
74
.github/workflows/package.yml
vendored
Normal file
@ -0,0 +1,74 @@
|
||||
name: Package Application with Pyinstaller
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- "*"
|
||||
tags:
|
||||
- "v*"
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ubuntu-latest, windows-latest, macos-latest]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: '3.x'
|
||||
|
||||
- name: "Install dependencies"
|
||||
run: "pip install setuptools keyring pyinstaller rich requests beautifulsoup4 -f --upgrade"
|
||||
|
||||
- name: "Install sync_url.py"
|
||||
run: "pyinstaller sync_url.py -F"
|
||||
|
||||
- name: "Move artifact"
|
||||
run: "mv dist/sync_url* dist/sync_url-${{ matrix.os }}"
|
||||
|
||||
- uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: "Pferd Sync URL"
|
||||
path: "dist/sync_url*"
|
||||
|
||||
release:
|
||||
name: Release
|
||||
|
||||
needs: [build]
|
||||
runs-on: ubuntu-latest
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
steps:
|
||||
- name: "Checkout"
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: "Download artifacts"
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
name: "Pferd Sync URL"
|
||||
|
||||
- name: "look at folder structure"
|
||||
run: "ls -lah"
|
||||
|
||||
- name: "Rename releases"
|
||||
run: "mv sync_url-macos-latest pferd_sync_url_mac && mv sync_url-ubuntu-latest pferd_sync_url_linux && mv sync_url-windows-latest pferd_sync_url.exe"
|
||||
|
||||
- name: "Create release"
|
||||
uses: softprops/action-gh-release@v1
|
||||
|
||||
- name: "Upload release artifacts"
|
||||
uses: softprops/action-gh-release@v1
|
||||
with:
|
||||
body: "Download the correct sync_url for your platform and run it in the terminal or CMD. You might need to make it executable on Linux/Mac with `chmod +x <file>`. Also please enclose the *url you pass to the program in double quotes* or your shell might silently screw it up!"
|
||||
files: |
|
||||
pferd_sync_url_mac
|
||||
pferd_sync_url_linux
|
||||
pferd_sync_url.exe
|
5
.gitignore
vendored
5
.gitignore
vendored
@ -8,3 +8,8 @@ build/
|
||||
.env
|
||||
.vscode
|
||||
ilias_cookies.txt
|
||||
PFERD.egg-info/
|
||||
|
||||
# PyInstaller
|
||||
sync_url.spec
|
||||
dist/
|
||||
|
2
LICENSE
2
LICENSE
@ -1,4 +1,4 @@
|
||||
Copyright 2019-2020 Garmelon, I-Al-Istannen, danstooamerican, pavelzw
|
||||
Copyright 2019-2020 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, TheChristophe, Scriptim
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in
|
||||
|
@ -3,8 +3,19 @@ General authenticators useful in many situations
|
||||
"""
|
||||
|
||||
import getpass
|
||||
import logging
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from .logging import PrettyLogger
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
PRETTY = PrettyLogger(LOGGER)
|
||||
|
||||
try:
|
||||
import keyring
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
class TfaAuthenticator:
|
||||
# pylint: disable=too-few-public-methods
|
||||
@ -123,3 +134,81 @@ class UserPassAuthenticator:
|
||||
if self._given_username is not None and self._given_password is not None:
|
||||
self._given_username = None
|
||||
self._given_password = None
|
||||
|
||||
|
||||
class KeyringAuthenticator(UserPassAuthenticator):
|
||||
"""
|
||||
An authenticator for username-password combinations that stores the
|
||||
password using the system keyring service and prompts the user for missing
|
||||
information.
|
||||
"""
|
||||
|
||||
def get_credentials(self) -> Tuple[str, str]:
|
||||
"""
|
||||
Returns a tuple (username, password). Prompts user for username or
|
||||
password when necessary.
|
||||
"""
|
||||
|
||||
if self._username is None and self._given_username is not None:
|
||||
self._username = self._given_username
|
||||
|
||||
if self._password is None and self._given_password is not None:
|
||||
self._password = self._given_password
|
||||
|
||||
if self._username is not None and self._password is None:
|
||||
self._load_password()
|
||||
|
||||
if self._username is None or self._password is None:
|
||||
print(f"Enter credentials ({self._reason})")
|
||||
|
||||
username: str
|
||||
if self._username is None:
|
||||
username = input("Username: ")
|
||||
self._username = username
|
||||
else:
|
||||
username = self._username
|
||||
|
||||
if self._password is None:
|
||||
self._load_password()
|
||||
|
||||
password: str
|
||||
if self._password is None:
|
||||
password = getpass.getpass(prompt="Password: ")
|
||||
self._password = password
|
||||
self._save_password()
|
||||
else:
|
||||
password = self._password
|
||||
|
||||
return (username, password)
|
||||
|
||||
def _load_password(self) -> None:
|
||||
"""
|
||||
Loads the saved password associated with self._username from the system
|
||||
keyring service (or None if not password has been saved yet) and stores
|
||||
it in self._password.
|
||||
"""
|
||||
self._password = keyring.get_password("pferd-ilias", self._username)
|
||||
|
||||
def _save_password(self) -> None:
|
||||
"""
|
||||
Saves self._password to the system keyring service and associates it
|
||||
with self._username.
|
||||
"""
|
||||
keyring.set_password("pferd-ilias", self._username, self._password)
|
||||
|
||||
def invalidate_credentials(self) -> None:
|
||||
"""
|
||||
Marks the credentials as invalid. If only a username was supplied in
|
||||
the constructor, assumes that the username is valid and only the
|
||||
password is invalid. If only a password was supplied in the
|
||||
constructor, assumes that the password is valid and only the username
|
||||
is invalid. Otherwise, assumes that username and password are both
|
||||
invalid.
|
||||
"""
|
||||
|
||||
try:
|
||||
keyring.delete_password("pferd-ilias", self._username)
|
||||
except keyring.errors.PasswordDeleteError:
|
||||
pass
|
||||
|
||||
super().invalidate_credentials()
|
||||
|
@ -5,6 +5,12 @@ from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
|
||||
def _mergeNoDuplicate(first: List[Path], second: List[Path]) -> List[Path]:
|
||||
tmp = list(set(first + second))
|
||||
tmp.sort(key=lambda x: str(x.resolve()))
|
||||
return tmp
|
||||
|
||||
|
||||
class DownloadSummary:
|
||||
"""
|
||||
Keeps track of all new, modified or deleted files and provides a summary.
|
||||
@ -40,9 +46,9 @@ class DownloadSummary:
|
||||
"""
|
||||
Merges ourselves with the passed summary. Modifies this object, but not the passed one.
|
||||
"""
|
||||
self._new_files += summary.new_files
|
||||
self._modified_files += summary.modified_files
|
||||
self._deleted_files += summary.deleted_files
|
||||
self._new_files = _mergeNoDuplicate(self._new_files, summary.new_files)
|
||||
self._modified_files = _mergeNoDuplicate(self._modified_files, summary.modified_files)
|
||||
self._deleted_files = _mergeNoDuplicate(self._deleted_files, summary.deleted_files)
|
||||
|
||||
def add_deleted_file(self, path: Path) -> None:
|
||||
"""
|
||||
|
@ -37,3 +37,21 @@ def swallow_and_print_errors(function: TFun) -> TFun:
|
||||
Console().print_exception()
|
||||
return None
|
||||
return cast(TFun, inner)
|
||||
|
||||
|
||||
def retry_on_io_exception(max_retries: int, message: str) -> Callable[[TFun], TFun]:
|
||||
"""
|
||||
Decorates a function and retries it on any exception until the max retries count is hit.
|
||||
"""
|
||||
def retry(function: TFun) -> TFun:
|
||||
def inner(*args: Any, **kwargs: Any) -> Any:
|
||||
for i in range(0, max_retries):
|
||||
# pylint: disable=broad-except
|
||||
try:
|
||||
return function(*args, **kwargs)
|
||||
except IOError as error:
|
||||
PRETTY.warning(f"Error duing operation '{message}': {error}")
|
||||
PRETTY.warning(
|
||||
f"Retrying operation '{message}'. Remaining retries: {max_retries - 1 - i}")
|
||||
return cast(TFun, inner)
|
||||
return retry
|
||||
|
@ -37,8 +37,12 @@ class KitShibbolethAuthenticator(IliasAuthenticator):
|
||||
Authenticate via KIT's shibboleth system.
|
||||
"""
|
||||
|
||||
def __init__(self, username: Optional[str] = None, password: Optional[str] = None) -> None:
|
||||
self._auth = UserPassAuthenticator("KIT ILIAS Shibboleth", username, password)
|
||||
def __init__(self, authenticator: Optional[UserPassAuthenticator] = None) -> None:
|
||||
if authenticator:
|
||||
self._auth = authenticator
|
||||
else:
|
||||
self._auth = UserPassAuthenticator("KIT ILIAS Shibboleth")
|
||||
|
||||
self._tfa_auth = TfaAuthenticator("KIT ILIAS Shibboleth")
|
||||
|
||||
def authenticate(self, sess: requests.Session) -> None:
|
||||
@ -70,6 +74,8 @@ class KitShibbolethAuthenticator(IliasAuthenticator):
|
||||
form = soup.find("form", {"class": "full content", "method": "post"})
|
||||
action = form["action"]
|
||||
|
||||
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
|
||||
|
||||
# Equivalent: Enter credentials in
|
||||
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
|
||||
LOGGER.debug("Attempt to log in to Shibboleth using credentials")
|
||||
@ -78,6 +84,7 @@ class KitShibbolethAuthenticator(IliasAuthenticator):
|
||||
"_eventId_proceed": "",
|
||||
"j_username": self._auth.username,
|
||||
"j_password": self._auth.password,
|
||||
"csrf_token": csrf_token
|
||||
}
|
||||
soup = soupify(sess.post(url, data=data))
|
||||
|
||||
|
@ -15,7 +15,7 @@ from urllib.parse import (parse_qs, urlencode, urljoin, urlparse, urlsplit,
|
||||
import bs4
|
||||
import requests
|
||||
|
||||
from ..errors import FatalException
|
||||
from ..errors import FatalException, retry_on_io_exception
|
||||
from ..logging import PrettyLogger
|
||||
from ..utils import soupify
|
||||
from .authenticators import IliasAuthenticator
|
||||
@ -26,6 +26,10 @@ LOGGER = logging.getLogger(__name__)
|
||||
PRETTY = PrettyLogger(LOGGER)
|
||||
|
||||
|
||||
def _sanitize_path_name(name: str) -> str:
|
||||
return name.replace("/", "-").replace("\\", "-")
|
||||
|
||||
|
||||
class IliasElementType(Enum):
|
||||
"""
|
||||
The type of an ilias element.
|
||||
@ -36,6 +40,7 @@ class IliasElementType(Enum):
|
||||
REGULAR_FILE = "REGULAR_FILE"
|
||||
VIDEO_FILE = "VIDEO_FILE"
|
||||
FORUM = "FORUM"
|
||||
MEETING = "MEETING"
|
||||
EXTERNAL_LINK = "EXTERNAL_LINK"
|
||||
|
||||
def is_folder(self) -> bool:
|
||||
@ -116,6 +121,16 @@ class IliasCrawler:
|
||||
|
||||
return urlunsplit((scheme, netloc, path, new_query_string, fragment))
|
||||
|
||||
def recursive_crawl_url(self, url: str) -> List[IliasDownloadInfo]:
|
||||
"""
|
||||
Crawls a given url *and all reachable elements in it*.
|
||||
|
||||
Args:
|
||||
url {str} -- the *full* url to crawl
|
||||
"""
|
||||
start_entries: List[IliasCrawlerEntry] = self._crawl_folder(Path(""), url)
|
||||
return self._iterate_entries_to_download_infos(start_entries)
|
||||
|
||||
def crawl_course(self, course_id: str) -> List[IliasDownloadInfo]:
|
||||
"""
|
||||
Starts the crawl process for a course, yielding a list of elements to (potentially)
|
||||
@ -134,7 +149,7 @@ class IliasCrawler:
|
||||
|
||||
if not self._is_course_id_valid(root_url, course_id):
|
||||
raise FatalException(
|
||||
"Invalid course id? The URL the server returned did not contain my id."
|
||||
"Invalid course id? I didn't find anything looking like a course!"
|
||||
)
|
||||
|
||||
# And treat it as a folder
|
||||
@ -143,7 +158,34 @@ class IliasCrawler:
|
||||
|
||||
def _is_course_id_valid(self, root_url: str, course_id: str) -> bool:
|
||||
response: requests.Response = self._session.get(root_url)
|
||||
return course_id in response.url
|
||||
# We were redirected ==> Non-existant ID
|
||||
if course_id not in response.url:
|
||||
return False
|
||||
|
||||
link_element: bs4.Tag = self._get_page(root_url, {}).find(id="current_perma_link")
|
||||
if not link_element:
|
||||
return False
|
||||
# It wasn't a course but a category list, forum, etc.
|
||||
return "crs_" in link_element.get("value")
|
||||
|
||||
def find_course_name(self, course_id: str) -> Optional[str]:
|
||||
"""
|
||||
Returns the name of a given course. None if it is not a valid course
|
||||
or it could not be found.
|
||||
"""
|
||||
course_url = self._url_set_query_param(
|
||||
self._base_url + "/goto.php", "target", f"crs_{course_id}"
|
||||
)
|
||||
return self.find_element_name(course_url)
|
||||
|
||||
def find_element_name(self, url: str) -> Optional[str]:
|
||||
"""
|
||||
Returns the name of the element at the given URL, if it can find one.
|
||||
"""
|
||||
focus_element: bs4.Tag = self._get_page(url, {}).find(id="il_mhead_t_focus")
|
||||
if not focus_element:
|
||||
return None
|
||||
return focus_element.text
|
||||
|
||||
def crawl_personal_desktop(self) -> List[IliasDownloadInfo]:
|
||||
"""
|
||||
@ -200,6 +242,8 @@ class IliasCrawler:
|
||||
entries_to_process += self._crawl_video_directory(entry.path, url)
|
||||
continue
|
||||
|
||||
PRETTY.warning(f"Unknown type: {entry.entry_type}!")
|
||||
|
||||
return result
|
||||
|
||||
def _crawl_folder(self, folder_path: Path, url: str) -> List[IliasCrawlerEntry]:
|
||||
@ -208,17 +252,45 @@ class IliasCrawler:
|
||||
"""
|
||||
soup = self._get_page(url, {})
|
||||
|
||||
if soup.find(id="headerimage"):
|
||||
element: bs4.Tag = soup.find(id="headerimage")
|
||||
if "opencast" in element.attrs["src"].lower():
|
||||
PRETTY.warning(f"Switched to crawling a video at {folder_path}")
|
||||
if not self.dir_filter(folder_path, IliasElementType.VIDEO_FOLDER):
|
||||
PRETTY.not_searching(folder_path, "user filter")
|
||||
return []
|
||||
return self._crawl_video_directory(folder_path, url)
|
||||
|
||||
result: List[IliasCrawlerEntry] = []
|
||||
|
||||
# Fetch all links and throw them to the general interpreter
|
||||
links: List[bs4.Tag] = soup.select("a.il_ContainerItemTitle")
|
||||
for link in links:
|
||||
abs_url = self._abs_url_from_link(link)
|
||||
element_path = Path(folder_path, link.getText().strip())
|
||||
element_path = Path(folder_path, _sanitize_path_name(link.getText().strip()))
|
||||
element_type = self._find_type_from_link(element_path, link, abs_url)
|
||||
|
||||
if element_type == IliasElementType.REGULAR_FILE:
|
||||
result += self._crawl_file(folder_path, link, abs_url)
|
||||
elif element_type == IliasElementType.MEETING:
|
||||
meeting_name = str(element_path.name)
|
||||
date_portion_str = meeting_name.split(" - ")[0]
|
||||
date_portion = demangle_date(date_portion_str)
|
||||
|
||||
if not date_portion:
|
||||
result += [IliasCrawlerEntry(element_path, abs_url, element_type, None)]
|
||||
continue
|
||||
|
||||
rest_of_name = meeting_name
|
||||
if rest_of_name.startswith(date_portion_str):
|
||||
rest_of_name = rest_of_name[len(date_portion_str):]
|
||||
|
||||
new_name = datetime.datetime.strftime(date_portion, "%Y-%m-%d, %H:%M") \
|
||||
+ rest_of_name
|
||||
new_path = Path(folder_path, _sanitize_path_name(new_name))
|
||||
result += [
|
||||
IliasCrawlerEntry(new_path, abs_url, IliasElementType.REGULAR_FOLDER, None)
|
||||
]
|
||||
elif element_type is not None:
|
||||
result += [IliasCrawlerEntry(element_path, abs_url, element_type, None)]
|
||||
else:
|
||||
@ -270,6 +342,8 @@ class IliasCrawler:
|
||||
"""
|
||||
# pylint: disable=too-many-return-statements
|
||||
|
||||
found_parent: Optional[bs4.Tag] = None
|
||||
|
||||
# We look for the outer div of our inner link, to find information around it
|
||||
# (mostly the icon)
|
||||
for parent in link_element.parents:
|
||||
@ -300,6 +374,9 @@ class IliasCrawler:
|
||||
if str(img_tag["src"]).endswith("frm.svg"):
|
||||
return IliasElementType.FORUM
|
||||
|
||||
if str(img_tag["src"]).endswith("sess.svg"):
|
||||
return IliasElementType.MEETING
|
||||
|
||||
return IliasElementType.REGULAR_FOLDER
|
||||
|
||||
@staticmethod
|
||||
@ -331,7 +408,7 @@ class IliasCrawler:
|
||||
modification_date = demangle_date(modification_date_str)
|
||||
|
||||
# Grab the name from the link text
|
||||
name = link_element.getText()
|
||||
name = _sanitize_path_name(link_element.getText())
|
||||
full_path = Path(path, name + "." + file_type)
|
||||
|
||||
return [
|
||||
@ -462,7 +539,7 @@ class IliasCrawler:
|
||||
).getText().strip()
|
||||
title += ".mp4"
|
||||
|
||||
video_path: Path = Path(parent_path, title)
|
||||
video_path: Path = Path(parent_path, _sanitize_path_name(title))
|
||||
|
||||
video_url = self._abs_url_from_link(link)
|
||||
|
||||
@ -534,6 +611,7 @@ class IliasCrawler:
|
||||
# Two divs, side by side. Left is the name, right is the link ==> get left
|
||||
# sibling
|
||||
file_name = file_link.parent.findPrevious(name="div").getText().strip()
|
||||
file_name = _sanitize_path_name(file_name)
|
||||
url = self._abs_url_from_link(file_link)
|
||||
|
||||
LOGGER.debug("Found file %r at %r", file_name, url)
|
||||
@ -547,10 +625,18 @@ class IliasCrawler:
|
||||
|
||||
return results
|
||||
|
||||
def _get_page(self, url: str, params: Dict[str, Any]) -> bs4.BeautifulSoup:
|
||||
@retry_on_io_exception(3, "fetching webpage")
|
||||
def _get_page(self, url: str, params: Dict[str, Any],
|
||||
retry_count: int = 0) -> bs4.BeautifulSoup:
|
||||
"""
|
||||
Fetches a page from ILIAS, authenticating when needed.
|
||||
"""
|
||||
|
||||
if retry_count >= 4:
|
||||
raise FatalException("Could not get a proper page after 4 tries. "
|
||||
"Maybe your URL is wrong, authentication fails continuously, "
|
||||
"your ILIAS connection is spotty or ILIAS is not well.")
|
||||
|
||||
LOGGER.debug("Fetching %r", url)
|
||||
|
||||
response = self._session.get(url, params=params)
|
||||
@ -571,7 +657,7 @@ class IliasCrawler:
|
||||
|
||||
self._authenticator.authenticate(self._session)
|
||||
|
||||
return self._get_page(url, params)
|
||||
return self._get_page(url, params, retry_count + 1)
|
||||
|
||||
@staticmethod
|
||||
def _is_logged_in(soup: bs4.BeautifulSoup) -> bool:
|
||||
|
@ -20,7 +20,7 @@ def demangle_date(date: str) -> Optional[datetime.datetime]:
|
||||
"Gestern, HH:MM"
|
||||
"Heute, HH:MM"
|
||||
"Morgen, HH:MM"
|
||||
"dd. mon.yyyy, HH:MM
|
||||
"dd. mon yyyy, HH:MM
|
||||
"""
|
||||
saved = locale.setlocale(locale.LC_ALL)
|
||||
try:
|
||||
|
@ -10,6 +10,7 @@ from typing import Callable, List, Optional, Union
|
||||
import bs4
|
||||
import requests
|
||||
|
||||
from ..errors import retry_on_io_exception
|
||||
from ..logging import PrettyLogger
|
||||
from ..organizer import Organizer
|
||||
from ..tmp_dir import TmpDir
|
||||
@ -116,15 +117,25 @@ class IliasDownloader:
|
||||
"""
|
||||
|
||||
LOGGER.debug("Downloading %r", info)
|
||||
|
||||
if not self._strategy(self._organizer, info):
|
||||
self._organizer.mark(info.path)
|
||||
return
|
||||
|
||||
tmp_file = self._tmp_dir.new_path()
|
||||
|
||||
while not self._try_download(info, tmp_file):
|
||||
LOGGER.info("Retrying download: %r", info)
|
||||
self._authenticator.authenticate(self._session)
|
||||
@retry_on_io_exception(3, "downloading file")
|
||||
def download_impl() -> bool:
|
||||
if not self._try_download(info, tmp_file):
|
||||
LOGGER.info("Re-Authenticating due to download failure: %r", info)
|
||||
self._authenticator.authenticate(self._session)
|
||||
raise IOError("Scheduled retry")
|
||||
else:
|
||||
return True
|
||||
|
||||
if not download_impl():
|
||||
PRETTY.error(f"Download of file {info.path} failed too often! Skipping it...")
|
||||
return
|
||||
|
||||
dst_path = self._organizer.accept_file(tmp_file, info.path)
|
||||
if dst_path and info.modification_date:
|
||||
|
154
PFERD/ipd.py
Normal file
154
PFERD/ipd.py
Normal file
@ -0,0 +1,154 @@
|
||||
"""
|
||||
Utility functions and a scraper/downloader for the IPD pages.
|
||||
"""
|
||||
import datetime
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Callable, List, Optional
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import bs4
|
||||
import requests
|
||||
|
||||
from PFERD.errors import FatalException
|
||||
from PFERD.utils import soupify
|
||||
|
||||
from .logging import PrettyLogger
|
||||
from .organizer import Organizer
|
||||
from .tmp_dir import TmpDir
|
||||
from .transform import Transformable
|
||||
from .utils import stream_to_path
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
PRETTY = PrettyLogger(LOGGER)
|
||||
|
||||
|
||||
@dataclass
|
||||
class IpdDownloadInfo(Transformable):
|
||||
"""
|
||||
Information about an ipd entry.
|
||||
"""
|
||||
url: str
|
||||
modification_date: Optional[datetime.datetime]
|
||||
|
||||
|
||||
IpdDownloadStrategy = Callable[[Organizer, IpdDownloadInfo], bool]
|
||||
|
||||
|
||||
def ipd_download_new_or_modified(organizer: Organizer, info: IpdDownloadInfo) -> bool:
|
||||
"""
|
||||
Accepts new files or files with a more recent modification date.
|
||||
"""
|
||||
resolved_file = organizer.resolve(info.path)
|
||||
if not resolved_file.exists():
|
||||
return True
|
||||
if not info.modification_date:
|
||||
PRETTY.ignored_file(info.path, "could not find modification time, file exists")
|
||||
return False
|
||||
|
||||
resolved_mod_time_seconds = resolved_file.stat().st_mtime
|
||||
|
||||
# Download if the info is newer
|
||||
if info.modification_date.timestamp() > resolved_mod_time_seconds:
|
||||
return True
|
||||
|
||||
PRETTY.ignored_file(info.path, "local file has newer or equal modification time")
|
||||
return False
|
||||
|
||||
|
||||
class IpdCrawler:
|
||||
# pylint: disable=too-few-public-methods
|
||||
"""
|
||||
A crawler for IPD pages.
|
||||
"""
|
||||
|
||||
def __init__(self, base_url: str):
|
||||
self._base_url = base_url
|
||||
|
||||
def _abs_url_from_link(self, link_tag: bs4.Tag) -> str:
|
||||
"""
|
||||
Create an absolute url from an <a> tag.
|
||||
"""
|
||||
return urljoin(self._base_url, link_tag.get("href"))
|
||||
|
||||
def crawl(self) -> List[IpdDownloadInfo]:
|
||||
"""
|
||||
Crawls the playlist given in the constructor.
|
||||
"""
|
||||
page = soupify(requests.get(self._base_url))
|
||||
|
||||
items: List[IpdDownloadInfo] = []
|
||||
|
||||
def is_relevant_url(x: str) -> bool:
|
||||
return x.endswith(".pdf") or x.endswith(".c") or x.endswith(".java") or x.endswith(".zip")
|
||||
|
||||
for link in page.findAll(name="a", attrs={"href": lambda x: x and is_relevant_url(x)}):
|
||||
href: str = link.attrs.get("href")
|
||||
name = href.split("/")[-1]
|
||||
|
||||
modification_date: Optional[datetime.datetime] = None
|
||||
try:
|
||||
enclosing_row: bs4.Tag = link.findParent(name="tr")
|
||||
if enclosing_row:
|
||||
date_text = enclosing_row.find(name="td").text
|
||||
modification_date = datetime.datetime.strptime(date_text, "%d.%m.%Y")
|
||||
except ValueError:
|
||||
modification_date = None
|
||||
|
||||
items.append(IpdDownloadInfo(
|
||||
Path(name),
|
||||
url=self._abs_url_from_link(link),
|
||||
modification_date=modification_date
|
||||
))
|
||||
|
||||
return items
|
||||
|
||||
|
||||
class IpdDownloader:
|
||||
"""
|
||||
A downloader for ipd files.
|
||||
"""
|
||||
|
||||
def __init__(self, tmp_dir: TmpDir, organizer: Organizer, strategy: IpdDownloadStrategy):
|
||||
self._tmp_dir = tmp_dir
|
||||
self._organizer = organizer
|
||||
self._strategy = strategy
|
||||
self._session = requests.session()
|
||||
|
||||
def download_all(self, infos: List[IpdDownloadInfo]) -> None:
|
||||
"""
|
||||
Download multiple files one after the other.
|
||||
"""
|
||||
for info in infos:
|
||||
self.download(info)
|
||||
|
||||
def download(self, info: IpdDownloadInfo) -> None:
|
||||
"""
|
||||
Download a single file.
|
||||
"""
|
||||
if not self._strategy(self._organizer, info):
|
||||
self._organizer.mark(info.path)
|
||||
return
|
||||
|
||||
with self._session.get(info.url, stream=True) as response:
|
||||
if response.status_code == 200:
|
||||
tmp_file = self._tmp_dir.new_path()
|
||||
stream_to_path(response, tmp_file, info.path.name)
|
||||
dst_path = self._organizer.accept_file(tmp_file, info.path)
|
||||
|
||||
if dst_path and info.modification_date:
|
||||
os.utime(
|
||||
dst_path,
|
||||
times=(
|
||||
math.ceil(info.modification_date.timestamp()),
|
||||
math.ceil(info.modification_date.timestamp())
|
||||
)
|
||||
)
|
||||
|
||||
elif response.status_code == 403:
|
||||
raise FatalException("Received 403. Are you not using the KIT VPN?")
|
||||
else:
|
||||
PRETTY.warning(f"Could not download file, got response {response.status_code}")
|
@ -3,13 +3,10 @@ Contains a few logger utility functions and implementations.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
from typing import Optional
|
||||
|
||||
from rich import print as rich_print
|
||||
from rich._log_render import LogRender
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
from rich.style import Style
|
||||
from rich.text import Text
|
||||
from rich.theme import Theme
|
||||
|
@ -7,8 +7,9 @@ import filecmp
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
from enum import Enum
|
||||
from pathlib import Path, PurePath
|
||||
from typing import List, Optional, Set
|
||||
from typing import Callable, List, Optional, Set
|
||||
|
||||
from .download_summary import DownloadSummary
|
||||
from .location import Location
|
||||
@ -19,6 +20,51 @@ LOGGER = logging.getLogger(__name__)
|
||||
PRETTY = PrettyLogger(LOGGER)
|
||||
|
||||
|
||||
class ConflictType(Enum):
|
||||
"""
|
||||
The type of the conflict. A file might not exist anymore and will be deleted
|
||||
or it might be overwritten with a newer version.
|
||||
|
||||
FILE_OVERWRITTEN: An existing file will be updated
|
||||
MARKED_FILE_OVERWRITTEN: A file is written for the second+ time in this run
|
||||
FILE_DELETED: The file was deleted
|
||||
"""
|
||||
FILE_OVERWRITTEN = "overwritten"
|
||||
MARKED_FILE_OVERWRITTEN = "marked_file_overwritten"
|
||||
FILE_DELETED = "deleted"
|
||||
|
||||
|
||||
class FileConflictResolution(Enum):
|
||||
"""
|
||||
The reaction when confronted with a file conflict:
|
||||
|
||||
DESTROY_EXISTING: Delete/overwrite the current file
|
||||
KEEP_EXISTING: Keep the current file
|
||||
DEFAULT: Do whatever the PFERD authors thought is sensible
|
||||
PROMPT: Interactively ask the user
|
||||
"""
|
||||
|
||||
DESTROY_EXISTING = "destroy"
|
||||
|
||||
KEEP_EXISTING = "keep"
|
||||
|
||||
DEFAULT = "default"
|
||||
|
||||
PROMPT = "prompt"
|
||||
|
||||
|
||||
FileConflictResolver = Callable[[PurePath, ConflictType], FileConflictResolution]
|
||||
|
||||
|
||||
def resolve_prompt_user(_path: PurePath, conflict: ConflictType) -> FileConflictResolution:
|
||||
"""
|
||||
Resolves conflicts by asking the user if a file was written twice or will be deleted.
|
||||
"""
|
||||
if conflict == ConflictType.FILE_OVERWRITTEN:
|
||||
return FileConflictResolution.DESTROY_EXISTING
|
||||
return FileConflictResolution.PROMPT
|
||||
|
||||
|
||||
class FileAcceptException(Exception):
|
||||
"""An exception while accepting a file."""
|
||||
|
||||
@ -26,7 +72,7 @@ class FileAcceptException(Exception):
|
||||
class Organizer(Location):
|
||||
"""A helper for managing downloaded files."""
|
||||
|
||||
def __init__(self, path: Path):
|
||||
def __init__(self, path: Path, conflict_resolver: FileConflictResolver = resolve_prompt_user):
|
||||
"""Create a new organizer for a given path."""
|
||||
super().__init__(path)
|
||||
self._known_files: Set[Path] = set()
|
||||
@ -36,6 +82,8 @@ class Organizer(Location):
|
||||
|
||||
self.download_summary = DownloadSummary()
|
||||
|
||||
self.conflict_resolver = conflict_resolver
|
||||
|
||||
def accept_file(self, src: Path, dst: PurePath) -> Optional[Path]:
|
||||
"""
|
||||
Move a file to this organizer and mark it.
|
||||
@ -67,13 +115,16 @@ class Organizer(Location):
|
||||
|
||||
if self._is_marked(dst):
|
||||
PRETTY.warning(f"File {str(dst_absolute)!r} was already written!")
|
||||
if not prompt_yes_no(f"Overwrite file?", default=False):
|
||||
conflict = ConflictType.MARKED_FILE_OVERWRITTEN
|
||||
if self._resolve_conflict("Overwrite file?", dst_absolute, conflict, default=False):
|
||||
PRETTY.ignored_file(dst_absolute, "file was written previously")
|
||||
return None
|
||||
|
||||
# Destination file is directory
|
||||
if dst_absolute.exists() and dst_absolute.is_dir():
|
||||
if prompt_yes_no(f"Overwrite folder {dst_absolute} with file?", default=False):
|
||||
prompt = f"Overwrite folder {dst_absolute} with file?"
|
||||
conflict = ConflictType.FILE_OVERWRITTEN
|
||||
if self._resolve_conflict(prompt, dst_absolute, conflict, default=False):
|
||||
shutil.rmtree(dst_absolute)
|
||||
else:
|
||||
PRETTY.warning(f"Could not add file {str(dst_absolute)!r}")
|
||||
@ -87,6 +138,12 @@ class Organizer(Location):
|
||||
self.mark(dst)
|
||||
return dst_absolute
|
||||
|
||||
prompt = f"Overwrite file {dst_absolute}?"
|
||||
conflict = ConflictType.FILE_OVERWRITTEN
|
||||
if not self._resolve_conflict(prompt, dst_absolute, conflict, default=True):
|
||||
PRETTY.ignored_file(dst_absolute, "user conflict resolution")
|
||||
return None
|
||||
|
||||
self.download_summary.add_modified_file(dst_absolute)
|
||||
PRETTY.modified_file(dst_absolute)
|
||||
else:
|
||||
@ -124,6 +181,8 @@ class Organizer(Location):
|
||||
self._cleanup(self.path)
|
||||
|
||||
def _cleanup(self, start_dir: Path) -> None:
|
||||
if not start_dir.exists():
|
||||
return
|
||||
paths: List[Path] = list(start_dir.iterdir())
|
||||
|
||||
# Recursively clean paths
|
||||
@ -142,6 +201,24 @@ class Organizer(Location):
|
||||
def _delete_file_if_confirmed(self, path: Path) -> None:
|
||||
prompt = f"Do you want to delete {path}"
|
||||
|
||||
if prompt_yes_no(prompt, False):
|
||||
if self._resolve_conflict(prompt, path, ConflictType.FILE_DELETED, default=False):
|
||||
self.download_summary.add_deleted_file(path)
|
||||
path.unlink()
|
||||
else:
|
||||
PRETTY.ignored_file(path, "user conflict resolution")
|
||||
|
||||
def _resolve_conflict(
|
||||
self, prompt: str, path: Path, conflict: ConflictType, default: bool
|
||||
) -> bool:
|
||||
if not self.conflict_resolver:
|
||||
return prompt_yes_no(prompt, default=default)
|
||||
|
||||
result = self.conflict_resolver(path, conflict)
|
||||
if result == FileConflictResolution.DEFAULT:
|
||||
return default
|
||||
if result == FileConflictResolution.KEEP_EXISTING:
|
||||
return False
|
||||
if result == FileConflictResolution.DESTROY_EXISTING:
|
||||
return True
|
||||
|
||||
return prompt_yes_no(prompt, default=default)
|
||||
|
165
PFERD/pferd.py
165
PFERD/pferd.py
@ -6,6 +6,7 @@ import logging
|
||||
from pathlib import Path
|
||||
from typing import Callable, List, Optional, Union
|
||||
|
||||
from .authenticators import UserPassAuthenticator
|
||||
from .cookie_jar import CookieJar
|
||||
from .diva import (DivaDownloader, DivaDownloadStrategy, DivaPlaylistCrawler,
|
||||
diva_download_new)
|
||||
@ -14,9 +15,11 @@ from .errors import FatalException, swallow_and_print_errors
|
||||
from .ilias import (IliasAuthenticator, IliasCrawler, IliasDirectoryFilter,
|
||||
IliasDownloader, IliasDownloadInfo, IliasDownloadStrategy,
|
||||
KitShibbolethAuthenticator, download_modified_or_new)
|
||||
from .ipd import (IpdCrawler, IpdDownloader, IpdDownloadInfo,
|
||||
IpdDownloadStrategy, ipd_download_new_or_modified)
|
||||
from .location import Location
|
||||
from .logging import PrettyLogger, enable_logging
|
||||
from .organizer import Organizer
|
||||
from .organizer import FileConflictResolver, Organizer, resolve_prompt_user
|
||||
from .tmp_dir import TmpDir
|
||||
from .transform import TF, Transform, apply_transform
|
||||
from .utils import PathLike, to_path
|
||||
@ -62,6 +65,13 @@ class Pferd(Location):
|
||||
for transformable in transformables:
|
||||
LOGGER.info(transformable.path)
|
||||
|
||||
@staticmethod
|
||||
def _get_authenticator(
|
||||
username: Optional[str], password: Optional[str]
|
||||
) -> KitShibbolethAuthenticator:
|
||||
inner_auth = UserPassAuthenticator("ILIAS - Pferd.py", username, password)
|
||||
return KitShibbolethAuthenticator(inner_auth)
|
||||
|
||||
def _ilias(
|
||||
self,
|
||||
target: PathLike,
|
||||
@ -74,12 +84,13 @@ class Pferd(Location):
|
||||
download_strategy: IliasDownloadStrategy,
|
||||
timeout: int,
|
||||
clean: bool = True,
|
||||
file_conflict_resolver: FileConflictResolver = resolve_prompt_user
|
||||
) -> Organizer:
|
||||
# pylint: disable=too-many-locals
|
||||
cookie_jar = CookieJar(to_path(cookies) if cookies else None)
|
||||
session = cookie_jar.create_session()
|
||||
tmp_dir = self._tmp_dir.new_subdir()
|
||||
organizer = Organizer(self.resolve(to_path(target)))
|
||||
organizer = Organizer(self.resolve(to_path(target)), file_conflict_resolver)
|
||||
|
||||
crawler = IliasCrawler(base_url, session, authenticator, dir_filter)
|
||||
downloader = IliasDownloader(tmp_dir, organizer, session,
|
||||
@ -115,6 +126,7 @@ class Pferd(Location):
|
||||
download_strategy: IliasDownloadStrategy = download_modified_or_new,
|
||||
clean: bool = True,
|
||||
timeout: int = 5,
|
||||
file_conflict_resolver: FileConflictResolver = resolve_prompt_user
|
||||
) -> Organizer:
|
||||
"""
|
||||
Synchronizes a folder with the ILIAS instance of the KIT.
|
||||
@ -142,9 +154,11 @@ class Pferd(Location):
|
||||
clean {bool} -- Whether to clean up when the method finishes.
|
||||
timeout {int} -- The download timeout for opencast videos. Sadly needed due to a
|
||||
requests bug.
|
||||
file_conflict_resolver {FileConflictResolver} -- A function specifying how to deal
|
||||
with overwriting or deleting files. The default always asks the user.
|
||||
"""
|
||||
# This authenticator only works with the KIT ilias instance.
|
||||
authenticator = KitShibbolethAuthenticator(username=username, password=password)
|
||||
authenticator = Pferd._get_authenticator(username=username, password=password)
|
||||
PRETTY.starting_synchronizer(target, "ILIAS", course_id)
|
||||
|
||||
organizer = self._ilias(
|
||||
@ -157,7 +171,8 @@ class Pferd(Location):
|
||||
transform=transform,
|
||||
download_strategy=download_strategy,
|
||||
clean=clean,
|
||||
timeout=timeout
|
||||
timeout=timeout,
|
||||
file_conflict_resolver=file_conflict_resolver
|
||||
)
|
||||
|
||||
self._download_summary.merge(organizer.download_summary)
|
||||
@ -182,6 +197,7 @@ class Pferd(Location):
|
||||
download_strategy: IliasDownloadStrategy = download_modified_or_new,
|
||||
clean: bool = True,
|
||||
timeout: int = 5,
|
||||
file_conflict_resolver: FileConflictResolver = resolve_prompt_user
|
||||
) -> Organizer:
|
||||
"""
|
||||
Synchronizes a folder with the ILIAS instance of the KIT. This method will crawl the ILIAS
|
||||
@ -208,9 +224,11 @@ class Pferd(Location):
|
||||
clean {bool} -- Whether to clean up when the method finishes.
|
||||
timeout {int} -- The download timeout for opencast videos. Sadly needed due to a
|
||||
requests bug.
|
||||
file_conflict_resolver {FileConflictResolver} -- A function specifying how to deal
|
||||
with overwriting or deleting files. The default always asks the user.
|
||||
"""
|
||||
# This authenticator only works with the KIT ilias instance.
|
||||
authenticator = KitShibbolethAuthenticator(username=username, password=password)
|
||||
authenticator = Pferd._get_authenticator(username, password)
|
||||
PRETTY.starting_synchronizer(target, "ILIAS", "Personal Desktop")
|
||||
|
||||
organizer = self._ilias(
|
||||
@ -223,13 +241,139 @@ class Pferd(Location):
|
||||
transform=transform,
|
||||
download_strategy=download_strategy,
|
||||
clean=clean,
|
||||
timeout=timeout
|
||||
timeout=timeout,
|
||||
file_conflict_resolver=file_conflict_resolver
|
||||
)
|
||||
|
||||
self._download_summary.merge(organizer.download_summary)
|
||||
|
||||
return organizer
|
||||
|
||||
@swallow_and_print_errors
|
||||
def ilias_kit_folder(
|
||||
self,
|
||||
target: PathLike,
|
||||
full_url: str,
|
||||
dir_filter: IliasDirectoryFilter = lambda x, y: True,
|
||||
transform: Transform = lambda x: x,
|
||||
cookies: Optional[PathLike] = None,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
download_strategy: IliasDownloadStrategy = download_modified_or_new,
|
||||
clean: bool = True,
|
||||
timeout: int = 5,
|
||||
file_conflict_resolver: FileConflictResolver = resolve_prompt_user
|
||||
) -> Organizer:
|
||||
"""
|
||||
Synchronizes a folder with a given folder on the ILIAS instance of the KIT.
|
||||
|
||||
Arguments:
|
||||
target {Path} -- the target path to write the data to
|
||||
full_url {str} -- the full url of the folder/videos/course to crawl
|
||||
|
||||
Keyword Arguments:
|
||||
dir_filter {IliasDirectoryFilter} -- A filter for directories. Will be applied on the
|
||||
crawler level, these directories and all of their content is skipped.
|
||||
(default: {lambdax:True})
|
||||
transform {Transform} -- A transformation function for the output paths. Return None
|
||||
to ignore a file. (default: {lambdax:x})
|
||||
cookies {Optional[Path]} -- The path to store and load cookies from.
|
||||
(default: {None})
|
||||
username {Optional[str]} -- The SCC username. If none is given, it will prompt
|
||||
the user. (default: {None})
|
||||
password {Optional[str]} -- The SCC password. If none is given, it will prompt
|
||||
the user. (default: {None})
|
||||
download_strategy {DownloadStrategy} -- A function to determine which files need to
|
||||
be downloaded. Can save bandwidth and reduce the number of requests.
|
||||
(default: {download_modified_or_new})
|
||||
clean {bool} -- Whether to clean up when the method finishes.
|
||||
timeout {int} -- The download timeout for opencast videos. Sadly needed due to a
|
||||
requests bug.
|
||||
file_conflict_resolver {FileConflictResolver} -- A function specifying how to deal
|
||||
with overwriting or deleting files. The default always asks the user.
|
||||
"""
|
||||
# This authenticator only works with the KIT ilias instance.
|
||||
authenticator = Pferd._get_authenticator(username=username, password=password)
|
||||
PRETTY.starting_synchronizer(target, "ILIAS", "An ILIAS element by url")
|
||||
|
||||
if not full_url.startswith("https://ilias.studium.kit.edu"):
|
||||
raise FatalException("Not a valid KIT ILIAS URL")
|
||||
|
||||
organizer = self._ilias(
|
||||
target=target,
|
||||
base_url="https://ilias.studium.kit.edu/",
|
||||
crawl_function=lambda crawler: crawler.recursive_crawl_url(full_url),
|
||||
authenticator=authenticator,
|
||||
cookies=cookies,
|
||||
dir_filter=dir_filter,
|
||||
transform=transform,
|
||||
download_strategy=download_strategy,
|
||||
clean=clean,
|
||||
timeout=timeout,
|
||||
file_conflict_resolver=file_conflict_resolver
|
||||
)
|
||||
|
||||
self._download_summary.merge(organizer.download_summary)
|
||||
|
||||
return organizer
|
||||
|
||||
@swallow_and_print_errors
|
||||
def ipd_kit(
|
||||
self,
|
||||
target: Union[PathLike, Organizer],
|
||||
url: str,
|
||||
transform: Transform = lambda x: x,
|
||||
download_strategy: IpdDownloadStrategy = ipd_download_new_or_modified,
|
||||
clean: bool = True,
|
||||
file_conflict_resolver: FileConflictResolver = resolve_prompt_user
|
||||
) -> Organizer:
|
||||
"""
|
||||
Synchronizes a folder with a DIVA playlist.
|
||||
|
||||
Arguments:
|
||||
target {Union[PathLike, Organizer]} -- The organizer / target folder to use.
|
||||
url {str} -- the url to the page
|
||||
|
||||
Keyword Arguments:
|
||||
transform {Transform} -- A transformation function for the output paths. Return None
|
||||
to ignore a file. (default: {lambdax:x})
|
||||
download_strategy {DivaDownloadStrategy} -- A function to determine which files need to
|
||||
be downloaded. Can save bandwidth and reduce the number of requests.
|
||||
(default: {diva_download_new})
|
||||
clean {bool} -- Whether to clean up when the method finishes.
|
||||
file_conflict_resolver {FileConflictResolver} -- A function specifying how to deal
|
||||
with overwriting or deleting files. The default always asks the user.
|
||||
"""
|
||||
tmp_dir = self._tmp_dir.new_subdir()
|
||||
|
||||
if target is None:
|
||||
PRETTY.starting_synchronizer("None", "IPD", url)
|
||||
raise FatalException("Got 'None' as target directory, aborting")
|
||||
|
||||
if isinstance(target, Organizer):
|
||||
organizer = target
|
||||
else:
|
||||
organizer = Organizer(self.resolve(to_path(target)), file_conflict_resolver)
|
||||
|
||||
PRETTY.starting_synchronizer(organizer.path, "IPD", url)
|
||||
|
||||
elements: List[IpdDownloadInfo] = IpdCrawler(url).crawl()
|
||||
transformed = apply_transform(transform, elements)
|
||||
|
||||
if self._test_run:
|
||||
self._print_transformables(transformed)
|
||||
return organizer
|
||||
|
||||
downloader = IpdDownloader(tmp_dir=tmp_dir, organizer=organizer, strategy=download_strategy)
|
||||
downloader.download_all(transformed)
|
||||
|
||||
if clean:
|
||||
organizer.cleanup()
|
||||
|
||||
self._download_summary.merge(organizer.download_summary)
|
||||
|
||||
return organizer
|
||||
|
||||
@swallow_and_print_errors
|
||||
def diva_kit(
|
||||
self,
|
||||
@ -237,7 +381,8 @@ class Pferd(Location):
|
||||
playlist_location: str,
|
||||
transform: Transform = lambda x: x,
|
||||
download_strategy: DivaDownloadStrategy = diva_download_new,
|
||||
clean: bool = True
|
||||
clean: bool = True,
|
||||
file_conflict_resolver: FileConflictResolver = resolve_prompt_user
|
||||
) -> Organizer:
|
||||
"""
|
||||
Synchronizes a folder with a DIVA playlist.
|
||||
@ -254,6 +399,8 @@ class Pferd(Location):
|
||||
be downloaded. Can save bandwidth and reduce the number of requests.
|
||||
(default: {diva_download_new})
|
||||
clean {bool} -- Whether to clean up when the method finishes.
|
||||
file_conflict_resolver {FileConflictResolver} -- A function specifying how to deal
|
||||
with overwriting or deleting files. The default always asks the user.
|
||||
"""
|
||||
tmp_dir = self._tmp_dir.new_subdir()
|
||||
|
||||
@ -269,7 +416,7 @@ class Pferd(Location):
|
||||
if isinstance(target, Organizer):
|
||||
organizer = target
|
||||
else:
|
||||
organizer = Organizer(self.resolve(to_path(target)))
|
||||
organizer = Organizer(self.resolve(to_path(target)), file_conflict_resolver)
|
||||
|
||||
PRETTY.starting_synchronizer(organizer.path, "DIVA", playlist_id)
|
||||
|
||||
@ -288,4 +435,6 @@ class Pferd(Location):
|
||||
if clean:
|
||||
organizer.cleanup()
|
||||
|
||||
self._download_summary.merge(organizer.download_summary)
|
||||
|
||||
return organizer
|
||||
|
@ -5,6 +5,8 @@ only files whose names match a regex, or renaming files from one numbering
|
||||
scheme to another.
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from pathlib import PurePath
|
||||
from typing import Callable, List, Optional, TypeVar
|
||||
@ -45,7 +47,8 @@ def apply_transform(
|
||||
|
||||
# Transform combinators
|
||||
|
||||
keep = lambda path: path
|
||||
def keep(path: PurePath) -> Optional[PurePath]:
|
||||
return path
|
||||
|
||||
def attempt(*args: Transform) -> Transform:
|
||||
def inner(path: PurePath) -> Optional[PurePath]:
|
||||
@ -125,3 +128,15 @@ def re_rename(regex: Regex, target: str) -> Transform:
|
||||
return path.with_name(target.format(*groups))
|
||||
return None
|
||||
return inner
|
||||
|
||||
|
||||
def sanitize_windows_path(path: PurePath) -> PurePath:
|
||||
"""
|
||||
A small function to escape characters that are forbidden in windows path names.
|
||||
This method is a no-op on other operating systems.
|
||||
"""
|
||||
# Escape windows illegal path characters
|
||||
if os.name == 'nt':
|
||||
sanitized_parts = [re.sub(r'[<>:"/|?]', "_", x) for x in list(path.parts)]
|
||||
return PurePath(*sanitized_parts)
|
||||
return path
|
||||
|
28
README.md
28
README.md
@ -2,6 +2,7 @@
|
||||
|
||||
**P**rogramm zum **F**lotten, **E**infachen **R**unterladen von **D**ateien
|
||||
|
||||
- [Quickstart with `sync_url`](#quickstart-with-sync_url)
|
||||
- [Installation](#installation)
|
||||
- [Upgrading from 2.0.0 to 2.1.0+](#upgrading-from-200-to-210)
|
||||
- [Example setup](#example-setup)
|
||||
@ -12,6 +13,23 @@
|
||||
- [Transform combinators](#transform-combinators)
|
||||
- [A short, but commented example](#a-short-but-commented-example)
|
||||
|
||||
## Quickstart with `sync_url`
|
||||
|
||||
The `sync_url` program allows you to just synchronize a given ILIAS URL (of a
|
||||
course, a folder, your personal desktop, etc.) without any extra configuration
|
||||
or setting up. Download the program, open ILIAS, copy the URL from the address
|
||||
bar and pass it to sync_url.
|
||||
|
||||
It bundles everything it needs in one executable and is easy to
|
||||
use, but doesn't expose all the configuration options and tweaks a full install
|
||||
does.
|
||||
|
||||
1. Download the `sync_url` binary from the [latest release](https://github.com/Garmelon/PFERD/releases/latest).
|
||||
2. Recognize that you most likely need to enclose the URL in `""` quotes to prevent your shell from interpreting `&` and other symbols
|
||||
3. Run the binary in your terminal (`./sync_url` or `sync_url.exe` in the CMD) to see the help and use it. I'd recommend using the `--cookies` option.
|
||||
If you are on **Linux/Mac**, you need to *make the file executable* using `chmod +x <file>`.
|
||||
If you are on **Mac**, you need to allow this unverified program to run (see e.g. [here](https://www.switchingtomac.com/tutorials/osx/how-to-run-unverified-apps-on-macos/))
|
||||
|
||||
## Installation
|
||||
|
||||
Ensure that you have at least Python 3.8 installed.
|
||||
@ -19,7 +37,7 @@ Ensure that you have at least Python 3.8 installed.
|
||||
To install PFERD or update your installation to the latest version, run this
|
||||
wherever you want to install or have already installed PFERD:
|
||||
```
|
||||
$ pip install git+https://github.com/Garmelon/PFERD@v2.3.0
|
||||
$ pip install git+https://github.com/Garmelon/PFERD@v2.6.2
|
||||
```
|
||||
|
||||
The use of [venv] is recommended.
|
||||
@ -41,9 +59,9 @@ A full example setup and initial use could look like:
|
||||
$ mkdir Vorlesungen
|
||||
$ cd Vorlesungen
|
||||
$ python3 -m venv .venv
|
||||
$ .venv/bin/activate
|
||||
$ pip install git+https://github.com/Garmelon/PFERD@v2.3.0
|
||||
$ curl -O https://raw.githubusercontent.com/Garmelon/PFERD/v2.3.0/example_config.py
|
||||
$ source .venv/bin/activate
|
||||
$ pip install git+https://github.com/Garmelon/PFERD@v2.6.2
|
||||
$ curl -O https://raw.githubusercontent.com/Garmelon/PFERD/v2.6.2/example_config.py
|
||||
$ python3 example_config.py
|
||||
$ deactivate
|
||||
```
|
||||
@ -51,7 +69,7 @@ $ deactivate
|
||||
Subsequent runs of the program might look like:
|
||||
```
|
||||
$ cd Vorlesungen
|
||||
$ .venv/bin/activate
|
||||
$ source .venv/bin/activate
|
||||
$ python3 example_config.py
|
||||
$ deactivate
|
||||
```
|
||||
|
2
mypy.ini
2
mypy.ini
@ -3,5 +3,5 @@ disallow_untyped_defs = True
|
||||
disallow_incomplete_defs = True
|
||||
no_implicit_optional = True
|
||||
|
||||
[mypy-rich.*,bs4]
|
||||
[mypy-rich.*,bs4,keyring]
|
||||
ignore_missing_imports = True
|
||||
|
4
requirements.txt
Normal file
4
requirements.txt
Normal file
@ -0,0 +1,4 @@
|
||||
requests>=2.21.0
|
||||
beautifulsoup4>=4.7.1
|
||||
rich>=2.1.0
|
||||
keyring>=21.5.0
|
5
setup.py
5
setup.py
@ -2,12 +2,13 @@ from setuptools import find_packages, setup
|
||||
|
||||
setup(
|
||||
name="PFERD",
|
||||
version="2.3.0",
|
||||
version="2.6.2",
|
||||
packages=find_packages(),
|
||||
install_requires=[
|
||||
"requests>=2.21.0",
|
||||
"beautifulsoup4>=4.7.1",
|
||||
"rich>=2.1.0"
|
||||
"rich>=2.1.0",
|
||||
"keyring>=21.5.0"
|
||||
],
|
||||
)
|
||||
|
||||
|
161
sync_url.py
Executable file
161
sync_url.py
Executable file
@ -0,0 +1,161 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
A simple script to download a course by name from ILIAS.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path, PurePath
|
||||
from typing import Optional
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from PFERD import Pferd
|
||||
from PFERD.authenticators import KeyringAuthenticator, UserPassAuthenticator
|
||||
from PFERD.cookie_jar import CookieJar
|
||||
from PFERD.ilias import (IliasCrawler, IliasElementType,
|
||||
KitShibbolethAuthenticator)
|
||||
from PFERD.logging import PrettyLogger, enable_logging
|
||||
from PFERD.organizer import (ConflictType, FileConflictResolution,
|
||||
FileConflictResolver, resolve_prompt_user)
|
||||
from PFERD.transform import sanitize_windows_path
|
||||
from PFERD.utils import to_path
|
||||
|
||||
_LOGGER = logging.getLogger("sync_url")
|
||||
_PRETTY = PrettyLogger(_LOGGER)
|
||||
|
||||
|
||||
def _extract_credentials(file_path: Optional[str],
|
||||
username: Optional[str], password: Optional[str]) -> UserPassAuthenticator:
|
||||
if not file_path:
|
||||
return UserPassAuthenticator("KIT ILIAS Shibboleth", username, password)
|
||||
|
||||
if not Path(file_path).exists():
|
||||
_PRETTY.error("Credential file does not exist")
|
||||
sys.exit(1)
|
||||
|
||||
with open(file_path, "r") as file:
|
||||
first_line = file.read().splitlines()[0]
|
||||
read_name, *read_password = first_line.split(":", 1)
|
||||
|
||||
name = read_name if read_name else None
|
||||
password = read_password[0] if read_password else None
|
||||
return UserPassAuthenticator("KIT ILIAS Shibboleth", username=name, password=password)
|
||||
|
||||
|
||||
def _resolve_remote_first(_path: PurePath, _conflict: ConflictType) -> FileConflictResolution:
|
||||
return FileConflictResolution.DESTROY_EXISTING
|
||||
|
||||
|
||||
def _resolve_local_first(_path: PurePath, _conflict: ConflictType) -> FileConflictResolution:
|
||||
return FileConflictResolution.KEEP_EXISTING
|
||||
|
||||
|
||||
def _resolve_no_delete(_path: PurePath, conflict: ConflictType) -> FileConflictResolution:
|
||||
# Update files
|
||||
if conflict == ConflictType.FILE_OVERWRITTEN:
|
||||
return FileConflictResolution.DESTROY_EXISTING
|
||||
if conflict == ConflictType.MARKED_FILE_OVERWRITTEN:
|
||||
return FileConflictResolution.DESTROY_EXISTING
|
||||
# But do not delete them
|
||||
return FileConflictResolution.KEEP_EXISTING
|
||||
|
||||
|
||||
def main() -> None:
|
||||
enable_logging(name="sync_url")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--test-run", action="store_true")
|
||||
parser.add_argument('-c', '--cookies', nargs='?', default=None, help="File to store cookies in")
|
||||
parser.add_argument('-u', '--username', nargs='?', default=None, help="Username for Ilias")
|
||||
parser.add_argument('-p', '--password', nargs='?', default=None, help="Password for Ilias")
|
||||
parser.add_argument('--credential-file', nargs='?', default=None,
|
||||
help="Path to a file containing credentials for Ilias. The file must have "
|
||||
"one line in the following format: '<user>:<password>'")
|
||||
parser.add_argument("-k", "--keyring", action="store_true",
|
||||
help="Use the system keyring service for authentication")
|
||||
parser.add_argument('--no-videos', action="store_true", help="Don't download videos")
|
||||
parser.add_argument('--local-first', action="store_true",
|
||||
help="Don't prompt for confirmation, keep existing files")
|
||||
parser.add_argument('--remote-first', action="store_true",
|
||||
help="Don't prompt for confirmation, delete and overwrite local files")
|
||||
parser.add_argument('--no-delete', action="store_true",
|
||||
help="Don't prompt for confirmation, overwrite local files, don't delete")
|
||||
parser.add_argument('url', help="URL to the course page")
|
||||
parser.add_argument('folder', nargs='?', default=None, help="Folder to put stuff into")
|
||||
args = parser.parse_args()
|
||||
|
||||
cookie_jar = CookieJar(to_path(args.cookies) if args.cookies else None)
|
||||
session = cookie_jar.create_session()
|
||||
|
||||
if args.keyring:
|
||||
if not args.username:
|
||||
_PRETTY.error("Keyring auth selected but no --username passed!")
|
||||
return
|
||||
inner_auth: UserPassAuthenticator = KeyringAuthenticator(
|
||||
"KIT ILIAS Shibboleth", username=args.username, password=args.password
|
||||
)
|
||||
else:
|
||||
inner_auth = _extract_credentials(args.credential_file, args.username, args.password)
|
||||
|
||||
username, password = inner_auth.get_credentials()
|
||||
authenticator = KitShibbolethAuthenticator(inner_auth)
|
||||
|
||||
url = urlparse(args.url)
|
||||
|
||||
crawler = IliasCrawler(url.scheme + '://' + url.netloc, session,
|
||||
authenticator, lambda x, y: True)
|
||||
|
||||
cookie_jar.load_cookies()
|
||||
|
||||
if args.folder is None:
|
||||
element_name = crawler.find_element_name(args.url)
|
||||
if not element_name:
|
||||
print("Error, could not get element name. Please specify a folder yourself.")
|
||||
return
|
||||
folder = sanitize_windows_path(Path(element_name.replace("/", "-").replace("\\", "-")))
|
||||
cookie_jar.save_cookies()
|
||||
else:
|
||||
folder = Path(args.folder)
|
||||
|
||||
# files may not escape the pferd_root with relative paths
|
||||
# note: Path(Path.cwd, Path(folder)) == Path(folder) if it is an absolute path
|
||||
pferd_root = Path(Path.cwd(), Path(folder)).parent
|
||||
# Folder might be a *PurePath* at this point
|
||||
target = Path(folder).resolve().name
|
||||
pferd = Pferd(pferd_root, test_run=args.test_run)
|
||||
|
||||
def dir_filter(_: Path, element: IliasElementType) -> bool:
|
||||
if args.no_videos:
|
||||
return element not in [IliasElementType.VIDEO_FILE, IliasElementType.VIDEO_FOLDER]
|
||||
return True
|
||||
|
||||
if args.local_first:
|
||||
file_conflict_resolver: FileConflictResolver = _resolve_local_first
|
||||
elif args.no_delete:
|
||||
file_conflict_resolver = _resolve_no_delete
|
||||
elif args.remote_first:
|
||||
file_conflict_resolver = _resolve_remote_first
|
||||
else:
|
||||
file_conflict_resolver = resolve_prompt_user
|
||||
|
||||
pferd.enable_logging()
|
||||
|
||||
# fetch
|
||||
pferd.ilias_kit_folder(
|
||||
target=target,
|
||||
full_url=args.url,
|
||||
cookies=args.cookies,
|
||||
dir_filter=dir_filter,
|
||||
username=username,
|
||||
password=password,
|
||||
file_conflict_resolver=file_conflict_resolver,
|
||||
transform=sanitize_windows_path
|
||||
)
|
||||
|
||||
pferd.print_summary()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Reference in New Issue
Block a user