diff --git a/PFERD/authenticators.py b/PFERD/authenticators.py deleted file mode 100644 index f85c9d3..0000000 --- a/PFERD/authenticators.py +++ /dev/null @@ -1,214 +0,0 @@ -""" -General authenticators useful in many situations -""" - -import getpass -import logging -from typing import Optional, Tuple - -from .logging import PrettyLogger - -LOGGER = logging.getLogger(__name__) -PRETTY = PrettyLogger(LOGGER) - -try: - import keyring -except ImportError: - pass - - -class TfaAuthenticator: - # pylint: disable=too-few-public-methods - """ - An authenticator for a TFA token. Always prompts the user, as the token can not be cached. - """ - - def __init__(self, reason: str): - """ - Create a new tfa authenticator. - - Arguments: - reason {str} -- the reason for obtaining the credentials - """ - self._reason = reason - - def get_token(self) -> str: - # pylint: disable=no-self-use - """ - Prompts the user for the token and returns it. - """ - print(f"Enter credentials ({self._reason})") - return getpass.getpass("TFA Token: ") - - -class UserPassAuthenticator: - """ - An authenticator for username-password combinations that prompts the user - for missing information. - """ - - def __init__( - self, - reason: str, - username: Optional[str] = None, - password: Optional[str] = None, - ) -> None: - """ - reason - what the credentials are used for - username - the username (if already known) - password - the password (if already known) - """ - - self._reason = reason - - self._given_username = username - self._given_password = password - - self._username = username - self._password = password - - def get_credentials(self) -> Tuple[str, str]: - """ - Returns a tuple (username, password). Prompts user for username or - password when necessary. - """ - - if self._username is None and self._given_username is not None: - self._username = self._given_username - - if self._password is None and self._given_password is not None: - self._password = self._given_password - - if self._username is None or self._password is None: - print(f"Enter credentials ({self._reason})") - - username: str - if self._username is None: - username = input("Username: ") - self._username = username - else: - username = self._username - - password: str - if self._password is None: - password = getpass.getpass(prompt="Password: ") - self._password = password - else: - password = self._password - - return (username, password) - - @property - def username(self) -> str: - """ - The username. Accessing this property may cause the authenticator to - prompt the user. - """ - - (username, _) = self.get_credentials() - return username - - @property - def password(self) -> str: - """ - The password. Accessing this property may cause the authenticator to - prompt the user. - """ - - (_, password) = self.get_credentials() - return password - - def invalidate_credentials(self) -> None: - """ - Marks the credentials as invalid. If only a username was supplied in - the constructor, assumes that the username is valid and only the - password is invalid. If only a password was supplied in the - constructor, assumes that the password is valid and only the username - is invalid. Otherwise, assumes that username and password are both - invalid. - """ - - self._username = None - self._password = None - - if self._given_username is not None and self._given_password is not None: - self._given_username = None - self._given_password = None - - -class KeyringAuthenticator(UserPassAuthenticator): - """ - An authenticator for username-password combinations that stores the - password using the system keyring service and prompts the user for missing - information. - """ - - def get_credentials(self) -> Tuple[str, str]: - """ - Returns a tuple (username, password). Prompts user for username or - password when necessary. - """ - - if self._username is None and self._given_username is not None: - self._username = self._given_username - - if self._password is None and self._given_password is not None: - self._password = self._given_password - - if self._username is not None and self._password is None: - self._load_password() - - if self._username is None or self._password is None: - print(f"Enter credentials ({self._reason})") - - username: str - if self._username is None: - username = input("Username: ") - self._username = username - else: - username = self._username - - if self._password is None: - self._load_password() - - password: str - if self._password is None: - password = getpass.getpass(prompt="Password: ") - self._password = password - self._save_password() - else: - password = self._password - - return (username, password) - - def _load_password(self) -> None: - """ - Loads the saved password associated with self._username from the system - keyring service (or None if not password has been saved yet) and stores - it in self._password. - """ - self._password = keyring.get_password("pferd-ilias", self._username) - - def _save_password(self) -> None: - """ - Saves self._password to the system keyring service and associates it - with self._username. - """ - keyring.set_password("pferd-ilias", self._username, self._password) - - def invalidate_credentials(self) -> None: - """ - Marks the credentials as invalid. If only a username was supplied in - the constructor, assumes that the username is valid and only the - password is invalid. If only a password was supplied in the - constructor, assumes that the password is valid and only the username - is invalid. Otherwise, assumes that username and password are both - invalid. - """ - - try: - keyring.delete_password("pferd-ilias", self._username) - except keyring.errors.PasswordDeleteError: - pass - - super().invalidate_credentials() diff --git a/PFERD/cookie_jar.py b/PFERD/cookie_jar.py deleted file mode 100644 index e5b568f..0000000 --- a/PFERD/cookie_jar.py +++ /dev/null @@ -1,69 +0,0 @@ -"""A helper for requests cookies.""" - -import logging -from http.cookiejar import LoadError, LWPCookieJar -from pathlib import Path -from typing import Optional - -import requests - -LOGGER = logging.getLogger(__name__) - - -class CookieJar: - """A cookie jar that can be persisted.""" - - def __init__(self, cookie_file: Optional[Path] = None) -> None: - """Create a new cookie jar at the given path. - - If the path is None, the cookies will not be persisted. - """ - self._cookies: LWPCookieJar - if cookie_file is None: - self._cookies = LWPCookieJar() - else: - self._cookies = LWPCookieJar(str(cookie_file.resolve())) - - @property - def cookies(self) -> LWPCookieJar: - """Return the requests cookie jar.""" - return self._cookies - - def load_cookies(self) -> None: - """Load all cookies from the file given in the constructor.""" - if self._cookies.filename is None: - return - - try: - LOGGER.info("Loading old cookies from %s", self._cookies.filename) - self._cookies.load(ignore_discard=True) - except (FileNotFoundError, LoadError): - LOGGER.warning( - "No valid cookie file found at %s, continuing with no cookies", - self._cookies.filename - ) - - def save_cookies(self, reason: Optional[str] = None) -> None: - """Save the cookies in the file given in the constructor.""" - if self._cookies.filename is None: - return - - if reason is None: - LOGGER.info("Saving cookies") - else: - LOGGER.info("Saving cookies (%s)", reason) - - # TODO figure out why ignore_discard is set - # TODO possibly catch a few more exceptions - self._cookies.save(ignore_discard=True) - - def create_session(self) -> requests.Session: - """Create a new session using the cookie jar.""" - sess = requests.Session() - - # From the request docs: "All requests code should work out of the box - # with externally provided instances of CookieJar, e.g. LWPCookieJar - # and FileCookieJar." - sess.cookies = self.cookies # type: ignore - - return sess diff --git a/PFERD/diva.py b/PFERD/diva.py deleted file mode 100644 index 148fa56..0000000 --- a/PFERD/diva.py +++ /dev/null @@ -1,169 +0,0 @@ -""" -Utility functions and a scraper/downloader for the KIT DIVA portal. -""" -import logging -import re -from dataclasses import dataclass -from pathlib import Path -from typing import Any, Callable, List, Optional - -import requests - -from .errors import FatalException -from .logging import PrettyLogger -from .organizer import Organizer -from .tmp_dir import TmpDir -from .transform import Transformable -from .utils import stream_to_path - -LOGGER = logging.getLogger(__name__) -PRETTY = PrettyLogger(LOGGER) - - -@dataclass -class DivaDownloadInfo(Transformable): - """ - Information about a DIVA video - """ - url: str - - -DivaDownloadStrategy = Callable[[Organizer, DivaDownloadInfo], bool] - - -def diva_download_new(organizer: Organizer, info: DivaDownloadInfo) -> bool: - """ - Accepts only new files. - """ - resolved_file = organizer.resolve(info.path) - if not resolved_file.exists(): - return True - PRETTY.ignored_file(info.path, "local file exists") - return False - - -class DivaPlaylistCrawler: - # pylint: disable=too-few-public-methods - """ - A crawler for DIVA playlists. - """ - - _PLAYLIST_BASE_URL = "https://mediaservice.bibliothek.kit.edu/asset/detail/" - _COLLECTION_BASE_URL = "https://mediaservice.bibliothek.kit.edu/asset/collection.json" - - def __init__(self, playlist_id: str): - self._id = playlist_id - - @classmethod - def fetch_id(cls, playlist_link: str) -> str: - """ - Fetches the ID for a playerlist, given the base link - (e.g. https://mediaservice.bibliothek.kit.edu/#/details/DIVA-2019-271). - - Raises a FatalException, if the id can not be resolved - """ - match = re.match(r".+#/details/(.+)", playlist_link) - if match is None: - raise FatalException( - "DIVA: Invalid playlist link format, could not extract details." - ) - base_name = match.group(1) - - response = requests.get(cls._PLAYLIST_BASE_URL + base_name + ".json") - - if response.status_code != 200: - raise FatalException( - f"DIVA: Got non-200 status code ({response.status_code}))" - f"when requesting {response.url!r}!" - ) - - body = response.json() - - if body["error"]: - raise FatalException(f"DIVA: Server returned error {body['error']!r}.") - - return body["result"]["collection"]["id"] - - def crawl(self) -> List[DivaDownloadInfo]: - """ - Crawls the playlist given in the constructor. - """ - response = requests.get(self._COLLECTION_BASE_URL, params={"collection": self._id}) - if response.status_code != 200: - raise FatalException(f"Server returned status {response.status_code}.") - - body = response.json() - - if body["error"]: - raise FatalException(f"Server returned error {body['error']!r}.") - - result = body["result"] - - if result["resultCount"] > result["pageSize"]: - PRETTY.warning("Did not receive all results, some will be missing") - - download_infos: List[DivaDownloadInfo] = [] - - for video in result["resultList"]: - title = video["title"] - collection_title = self._follow_path(["collection", "title"], video) - url = self._follow_path( - ["resourceList", "derivateList", "mp4", "url"], - video - ) - - if url and collection_title and title: - path = Path(collection_title, title + ".mp4") - download_infos.append(DivaDownloadInfo(path, url)) - else: - PRETTY.warning(f"Incomplete video found: {title!r} {collection_title!r} {url!r}") - - return download_infos - - @staticmethod - def _follow_path(path: List[str], obj: Any) -> Optional[Any]: - """ - Follows a property path through an object, bailing at the first None. - """ - current = obj - for path_step in path: - if path_step in current: - current = current[path_step] - else: - return None - return current - - -class DivaDownloader: - """ - A downloader for DIVA videos. - """ - - def __init__(self, tmp_dir: TmpDir, organizer: Organizer, strategy: DivaDownloadStrategy): - self._tmp_dir = tmp_dir - self._organizer = organizer - self._strategy = strategy - self._session = requests.session() - - def download_all(self, infos: List[DivaDownloadInfo]) -> None: - """ - Download multiple files one after the other. - """ - for info in infos: - self.download(info) - - def download(self, info: DivaDownloadInfo) -> None: - """ - Download a single file. - """ - if not self._strategy(self._organizer, info): - self._organizer.mark(info.path) - return - - with self._session.get(info.url, stream=True) as response: - if response.status_code == 200: - tmp_file = self._tmp_dir.new_path() - stream_to_path(response, tmp_file, info.path.name) - self._organizer.accept_file(tmp_file, info.path) - else: - PRETTY.warning(f"Could not download file, got response {response.status_code}") diff --git a/PFERD/download_summary.py b/PFERD/download_summary.py deleted file mode 100644 index 3b9a024..0000000 --- a/PFERD/download_summary.py +++ /dev/null @@ -1,75 +0,0 @@ -""" -Provides a summary that keeps track of new modified or deleted files. -""" -from pathlib import Path -from typing import List - - -def _mergeNoDuplicate(first: List[Path], second: List[Path]) -> List[Path]: - tmp = list(set(first + second)) - tmp.sort(key=lambda x: str(x.resolve())) - return tmp - - -class DownloadSummary: - """ - Keeps track of all new, modified or deleted files and provides a summary. - """ - - def __init__(self) -> None: - self._new_files: List[Path] = [] - self._modified_files: List[Path] = [] - self._deleted_files: List[Path] = [] - - @property - def new_files(self) -> List[Path]: - """ - Returns all new files. - """ - return self._new_files.copy() - - @property - def modified_files(self) -> List[Path]: - """ - Returns all modified files. - """ - return self._modified_files.copy() - - @property - def deleted_files(self) -> List[Path]: - """ - Returns all deleted files. - """ - return self._deleted_files.copy() - - def merge(self, summary: 'DownloadSummary') -> None: - """ - Merges ourselves with the passed summary. Modifies this object, but not the passed one. - """ - self._new_files = _mergeNoDuplicate(self._new_files, summary.new_files) - self._modified_files = _mergeNoDuplicate(self._modified_files, summary.modified_files) - self._deleted_files = _mergeNoDuplicate(self._deleted_files, summary.deleted_files) - - def add_deleted_file(self, path: Path) -> None: - """ - Registers a file as deleted. - """ - self._deleted_files.append(path) - - def add_modified_file(self, path: Path) -> None: - """ - Registers a file as changed. - """ - self._modified_files.append(path) - - def add_new_file(self, path: Path) -> None: - """ - Registers a file as new. - """ - self._new_files.append(path) - - def has_updates(self) -> bool: - """ - Returns whether this summary has any updates. - """ - return bool(self._new_files or self._modified_files or self._deleted_files) diff --git a/PFERD/downloaders.py b/PFERD/downloaders.py deleted file mode 100644 index 94b8b9f..0000000 --- a/PFERD/downloaders.py +++ /dev/null @@ -1,72 +0,0 @@ -""" -General downloaders useful in many situations -""" - -from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional - -import requests -import requests.auth - -from .organizer import Organizer -from .tmp_dir import TmpDir -from .transform import Transformable -from .utils import stream_to_path - - -@dataclass -class HttpDownloadInfo(Transformable): - """ - This class describes a single file to be downloaded. - """ - - url: str - parameters: Dict[str, Any] = field(default_factory=dict) - - -class HttpDownloader: - """A HTTP downloader that can handle HTTP basic auth.""" - - def __init__( - self, - tmp_dir: TmpDir, - organizer: Organizer, - username: Optional[str], - password: Optional[str], - ): - """Create a new http downloader.""" - self._organizer = organizer - self._tmp_dir = tmp_dir - self._username = username - self._password = password - self._session = self._build_session() - - def _build_session(self) -> requests.Session: - session = requests.Session() - if self._username and self._password: - session.auth = requests.auth.HTTPBasicAuth( - self._username, self._password - ) - return session - - def download_all(self, infos: List[HttpDownloadInfo]) -> None: - """ - Download multiple files one after the other. - """ - - for info in infos: - self.download(info) - - def download(self, info: HttpDownloadInfo) -> None: - """ - Download a single file. - """ - - with self._session.get(info.url, params=info.parameters, stream=True) as response: - if response.status_code == 200: - tmp_file = self._tmp_dir.new_path() - stream_to_path(response, tmp_file, info.path.name) - self._organizer.accept_file(tmp_file, info.path) - else: - # TODO use proper exception - raise Exception(f"Could not download file, got response {response.status_code}") diff --git a/PFERD/errors.py b/PFERD/errors.py deleted file mode 100644 index d960e13..0000000 --- a/PFERD/errors.py +++ /dev/null @@ -1,57 +0,0 @@ -""" -An error logging decorator. -""" - -import logging -from typing import Any, Callable, TypeVar, cast - -from rich.console import Console - -from .logging import PrettyLogger - -LOGGER = logging.getLogger(__name__) -PRETTY = PrettyLogger(LOGGER) - - -class FatalException(Exception): - """ - A fatal exception occurred. Recovery is not possible. - """ - - -TFun = TypeVar('TFun', bound=Callable[..., Any]) - - -def swallow_and_print_errors(function: TFun) -> TFun: - """ - Decorates a function, swallows all errors, logs them and returns none if one occurred. - """ - def inner(*args: Any, **kwargs: Any) -> Any: - # pylint: disable=broad-except - try: - return function(*args, **kwargs) - except FatalException as error: - PRETTY.error(str(error)) - return None - except Exception as error: - Console().print_exception() - return None - return cast(TFun, inner) - - -def retry_on_io_exception(max_retries: int, message: str) -> Callable[[TFun], TFun]: - """ - Decorates a function and retries it on any exception until the max retries count is hit. - """ - def retry(function: TFun) -> TFun: - def inner(*args: Any, **kwargs: Any) -> Any: - for i in range(0, max_retries): - # pylint: disable=broad-except - try: - return function(*args, **kwargs) - except IOError as error: - PRETTY.warning(f"Error duing operation '{message}': {error}") - PRETTY.warning( - f"Retrying operation '{message}'. Remaining retries: {max_retries - 1 - i}") - return cast(TFun, inner) - return retry diff --git a/PFERD/ilias/__init__.py b/PFERD/ilias/__init__.py deleted file mode 100644 index 0a5f08b..0000000 --- a/PFERD/ilias/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -""" -Synchronizing files from ILIAS instances (https://www.ilias.de/). -""" - -from .authenticators import IliasAuthenticator, KitShibbolethAuthenticator -from .crawler import (IliasCrawler, IliasCrawlerEntry, IliasDirectoryFilter, - IliasElementType) -from .downloader import (IliasDownloader, IliasDownloadInfo, - IliasDownloadStrategy, download_everything, - download_modified_or_new) diff --git a/PFERD/ilias/authenticators.py b/PFERD/ilias/authenticators.py deleted file mode 100644 index 4b99dd8..0000000 --- a/PFERD/ilias/authenticators.py +++ /dev/null @@ -1,138 +0,0 @@ -""" -Authenticators that can obtain proper ILIAS session cookies. -""" - -import abc -import logging -from typing import Optional - -import bs4 -import requests - -from ..authenticators import TfaAuthenticator, UserPassAuthenticator -from ..utils import soupify - -LOGGER = logging.getLogger(__name__) - - -class IliasAuthenticator(abc.ABC): - # pylint: disable=too-few-public-methods - - """ - An authenticator that logs an existing requests session into an ILIAS - account. - """ - - @abc.abstractmethod - def authenticate(self, sess: requests.Session) -> None: - """ - Log a requests session into this authenticator's ILIAS account. - """ - - -class KitShibbolethAuthenticator(IliasAuthenticator): - # pylint: disable=too-few-public-methods - - """ - Authenticate via KIT's shibboleth system. - """ - - def __init__(self, authenticator: Optional[UserPassAuthenticator] = None) -> None: - if authenticator: - self._auth = authenticator - else: - self._auth = UserPassAuthenticator("KIT ILIAS Shibboleth") - - self._tfa_auth = TfaAuthenticator("KIT ILIAS Shibboleth") - - def authenticate(self, sess: requests.Session) -> None: - """ - Performs the ILIAS Shibboleth authentication dance and saves the login - cookies it receieves. - - This function should only be called whenever it is detected that you're - not logged in. The cookies obtained should be good for a few minutes, - maybe even an hour or two. - """ - - # Equivalent: Click on "Mit KIT-Account anmelden" button in - # https://ilias.studium.kit.edu/login.php - LOGGER.debug("Begin authentication process with ILIAS") - url = "https://ilias.studium.kit.edu/Shibboleth.sso/Login" - data = { - "sendLogin": "1", - "idp_selection": "https://idp.scc.kit.edu/idp/shibboleth", - "target": "/shib_login.php", - "home_organization_selection": "Mit KIT-Account anmelden", - } - soup = soupify(sess.post(url, data=data)) - - # Attempt to login using credentials, if necessary - while not self._login_successful(soup): - # Searching the form here so that this fails before asking for - # credentials rather than after asking. - form = soup.find("form", {"class": "full content", "method": "post"}) - action = form["action"] - - csrf_token = form.find("input", {"name": "csrf_token"})["value"] - - # Equivalent: Enter credentials in - # https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO - LOGGER.debug("Attempt to log in to Shibboleth using credentials") - url = "https://idp.scc.kit.edu" + action - data = { - "_eventId_proceed": "", - "j_username": self._auth.username, - "j_password": self._auth.password, - "csrf_token": csrf_token - } - soup = soupify(sess.post(url, data=data)) - - if self._tfa_required(soup): - soup = self._authenticate_tfa(sess, soup) - - if not self._login_successful(soup): - print("Incorrect credentials.") - self._auth.invalidate_credentials() - - # Equivalent: Being redirected via JS automatically - # (or clicking "Continue" if you have JS disabled) - LOGGER.debug("Redirect back to ILIAS with login information") - relay_state = soup.find("input", {"name": "RelayState"}) - saml_response = soup.find("input", {"name": "SAMLResponse"}) - url = "https://ilias.studium.kit.edu/Shibboleth.sso/SAML2/POST" - data = { # using the info obtained in the while loop above - "RelayState": relay_state["value"], - "SAMLResponse": saml_response["value"], - } - sess.post(url, data=data) - - def _authenticate_tfa( - self, - session: requests.Session, - soup: bs4.BeautifulSoup - ) -> bs4.BeautifulSoup: - # Searching the form here so that this fails before asking for - # credentials rather than after asking. - form = soup.find("form", {"method": "post"}) - action = form["action"] - - # Equivalent: Enter token in - # https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO - LOGGER.debug("Attempt to log in to Shibboleth with TFA token") - url = "https://idp.scc.kit.edu" + action - data = { - "_eventId_proceed": "", - "j_tokenNumber": self._tfa_auth.get_token() - } - return soupify(session.post(url, data=data)) - - @staticmethod - def _login_successful(soup: bs4.BeautifulSoup) -> bool: - relay_state = soup.find("input", {"name": "RelayState"}) - saml_response = soup.find("input", {"name": "SAMLResponse"}) - return relay_state is not None and saml_response is not None - - @staticmethod - def _tfa_required(soup: bs4.BeautifulSoup) -> bool: - return soup.find(id="j_tokenNumber") is not None diff --git a/PFERD/ilias/crawler.py b/PFERD/ilias/crawler.py deleted file mode 100644 index edab284..0000000 --- a/PFERD/ilias/crawler.py +++ /dev/null @@ -1,684 +0,0 @@ -""" -Contains an ILIAS crawler alongside helper functions. -""" - -import datetime -import json -import logging -import re -from enum import Enum -from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Union -from urllib.parse import (parse_qs, urlencode, urljoin, urlparse, urlsplit, - urlunsplit) - -import bs4 -import requests - -from ..errors import FatalException, retry_on_io_exception -from ..logging import PrettyLogger -from ..utils import soupify -from .authenticators import IliasAuthenticator -from .date_demangler import demangle_date -from .downloader import IliasDownloadInfo - -LOGGER = logging.getLogger(__name__) -PRETTY = PrettyLogger(LOGGER) - - -def _sanitize_path_name(name: str) -> str: - return name.replace("/", "-").replace("\\", "-") - - -class IliasElementType(Enum): - """ - The type of an ilias element. - """ - REGULAR_FOLDER = "REGULAR_FOLDER" - VIDEO_FOLDER = "VIDEO_FOLDER" - EXERCISE_FOLDER = "EXERCISE_FOLDER" - REGULAR_FILE = "REGULAR_FILE" - VIDEO_FILE = "VIDEO_FILE" - FORUM = "FORUM" - MEETING = "MEETING" - EXTERNAL_LINK = "EXTERNAL_LINK" - - def is_folder(self) -> bool: - """ - Returns whether this type is some kind of folder. - """ - return "FOLDER" in str(self.name) - - -IliasDirectoryFilter = Callable[[Path, IliasElementType], bool] - - -class IliasCrawlerEntry: - # pylint: disable=too-few-public-methods - """ - An ILIAS crawler entry used internally to find, catalogue and recursively crawl elements. - """ - - def __init__( - self, - path: Path, - url: Union[str, Callable[[], Optional[str]]], - entry_type: IliasElementType, - modification_date: Optional[datetime.datetime] - ): - self.path = path - if isinstance(url, str): - str_url = url - self.url: Callable[[], Optional[str]] = lambda: str_url - else: - self.url = url - self.entry_type = entry_type - self.modification_date = modification_date - - def to_download_info(self) -> Optional[IliasDownloadInfo]: - """ - Converts this crawler entry to an IliasDownloadInfo, if possible. - This method will only succeed for *File* types. - """ - if self.entry_type in [IliasElementType.REGULAR_FILE, IliasElementType.VIDEO_FILE]: - return IliasDownloadInfo(self.path, self.url, self.modification_date) - return None - - -class IliasCrawler: - # pylint: disable=too-few-public-methods - - """ - A crawler for ILIAS. - """ - - # pylint: disable=too-many-arguments - def __init__( - self, - base_url: str, - session: requests.Session, - authenticator: IliasAuthenticator, - dir_filter: IliasDirectoryFilter - ): - """ - Create a new ILIAS crawler. - """ - - self._base_url = base_url - self._session = session - self._authenticator = authenticator - self.dir_filter = dir_filter - - @staticmethod - def _url_set_query_param(url: str, param: str, value: str) -> str: - """ - Set a query parameter in an url, overwriting existing ones with the same name. - """ - scheme, netloc, path, query, fragment = urlsplit(url) - query_parameters = parse_qs(query) - query_parameters[param] = [value] - new_query_string = urlencode(query_parameters, doseq=True) - - return urlunsplit((scheme, netloc, path, new_query_string, fragment)) - - def recursive_crawl_url(self, url: str) -> List[IliasDownloadInfo]: - """ - Crawls a given url *and all reachable elements in it*. - - Args: - url {str} -- the *full* url to crawl - """ - start_entries: List[IliasCrawlerEntry] = self._crawl_folder(Path(""), url) - return self._iterate_entries_to_download_infos(start_entries) - - def crawl_course(self, course_id: str) -> List[IliasDownloadInfo]: - """ - Starts the crawl process for a course, yielding a list of elements to (potentially) - download. - - Arguments: - course_id {str} -- the course id - - Raises: - FatalException: if an unrecoverable error occurs or the course id is not valid - """ - # Start crawling at the given course - root_url = self._url_set_query_param( - self._base_url + "/goto.php", "target", f"crs_{course_id}" - ) - - if not self._is_course_id_valid(root_url, course_id): - raise FatalException( - "Invalid course id? I didn't find anything looking like a course!" - ) - - # And treat it as a folder - entries: List[IliasCrawlerEntry] = self._crawl_folder(Path(""), root_url) - return self._iterate_entries_to_download_infos(entries) - - def _is_course_id_valid(self, root_url: str, course_id: str) -> bool: - response: requests.Response = self._session.get(root_url) - # We were redirected ==> Non-existant ID - if course_id not in response.url: - return False - - link_element: bs4.Tag = self._get_page(root_url, {}).find(id="current_perma_link") - if not link_element: - return False - # It wasn't a course but a category list, forum, etc. - return "crs_" in link_element.get("value") - - def find_course_name(self, course_id: str) -> Optional[str]: - """ - Returns the name of a given course. None if it is not a valid course - or it could not be found. - """ - course_url = self._url_set_query_param( - self._base_url + "/goto.php", "target", f"crs_{course_id}" - ) - return self.find_element_name(course_url) - - def find_element_name(self, url: str) -> Optional[str]: - """ - Returns the name of the element at the given URL, if it can find one. - """ - focus_element: bs4.Tag = self._get_page(url, {}).find(id="il_mhead_t_focus") - if not focus_element: - return None - return focus_element.text - - def crawl_personal_desktop(self) -> List[IliasDownloadInfo]: - """ - Crawls the ILIAS personal desktop (and every subelements that can be reached from there). - - Raises: - FatalException: if an unrecoverable error occurs - """ - entries: List[IliasCrawlerEntry] = self._crawl_folder( - Path(""), self._base_url + "?baseClass=ilPersonalDesktopGUI" - ) - return self._iterate_entries_to_download_infos(entries) - - def _iterate_entries_to_download_infos( - self, - entries: List[IliasCrawlerEntry] - ) -> List[IliasDownloadInfo]: - result: List[IliasDownloadInfo] = [] - entries_to_process: List[IliasCrawlerEntry] = entries.copy() - while len(entries_to_process) > 0: - entry = entries_to_process.pop() - - if entry.entry_type == IliasElementType.EXTERNAL_LINK: - PRETTY.not_searching(entry.path, "external link") - continue - if entry.entry_type == IliasElementType.FORUM: - PRETTY.not_searching(entry.path, "forum") - continue - - if entry.entry_type.is_folder() and not self.dir_filter(entry.path, entry.entry_type): - PRETTY.not_searching(entry.path, "user filter") - continue - - download_info = entry.to_download_info() - if download_info is not None: - result.append(download_info) - continue - - url = entry.url() - - if url is None: - PRETTY.warning(f"Could not find url for {str(entry.path)!r}, skipping it") - continue - - PRETTY.searching(entry.path) - - if entry.entry_type == IliasElementType.EXERCISE_FOLDER: - entries_to_process += self._crawl_exercises(entry.path, url) - continue - if entry.entry_type == IliasElementType.REGULAR_FOLDER: - entries_to_process += self._crawl_folder(entry.path, url) - continue - if entry.entry_type == IliasElementType.VIDEO_FOLDER: - entries_to_process += self._crawl_video_directory(entry.path, url) - continue - - PRETTY.warning(f"Unknown type: {entry.entry_type}!") - - return result - - def _crawl_folder(self, folder_path: Path, url: str) -> List[IliasCrawlerEntry]: - """ - Crawl all files in a folder-like element. - """ - soup = self._get_page(url, {}) - - if soup.find(id="headerimage"): - element: bs4.Tag = soup.find(id="headerimage") - if "opencast" in element.attrs["src"].lower(): - PRETTY.warning(f"Switched to crawling a video at {folder_path}") - if not self.dir_filter(folder_path, IliasElementType.VIDEO_FOLDER): - PRETTY.not_searching(folder_path, "user filter") - return [] - return self._crawl_video_directory(folder_path, url) - - result: List[IliasCrawlerEntry] = [] - - # Fetch all links and throw them to the general interpreter - links: List[bs4.Tag] = soup.select("a.il_ContainerItemTitle") - for link in links: - abs_url = self._abs_url_from_link(link) - element_path = Path(folder_path, _sanitize_path_name(link.getText().strip())) - element_type = self._find_type_from_link(element_path, link, abs_url) - - if element_type == IliasElementType.REGULAR_FILE: - result += self._crawl_file(folder_path, link, abs_url) - elif element_type == IliasElementType.MEETING: - meeting_name = str(element_path.name) - date_portion_str = meeting_name.split(" - ")[0] - date_portion = demangle_date(date_portion_str) - - if not date_portion: - result += [IliasCrawlerEntry(element_path, abs_url, element_type, None)] - continue - - rest_of_name = meeting_name - if rest_of_name.startswith(date_portion_str): - rest_of_name = rest_of_name[len(date_portion_str):] - - new_name = datetime.datetime.strftime(date_portion, "%Y-%m-%d, %H:%M") \ - + rest_of_name - new_path = Path(folder_path, _sanitize_path_name(new_name)) - result += [ - IliasCrawlerEntry(new_path, abs_url, IliasElementType.REGULAR_FOLDER, None) - ] - elif element_type is not None: - result += [IliasCrawlerEntry(element_path, abs_url, element_type, None)] - else: - PRETTY.warning(f"Found element without a type at {str(element_path)!r}") - - return result - - def _abs_url_from_link(self, link_tag: bs4.Tag) -> str: - """ - Create an absolute url from an tag. - """ - return urljoin(self._base_url, link_tag.get("href")) - - @staticmethod - def _find_type_from_link( - path: Path, - link_element: bs4.Tag, - url: str - ) -> Optional[IliasElementType]: - """ - Decides which sub crawler to use for a given top level element. - """ - parsed_url = urlparse(url) - LOGGER.debug("Parsed url: %r", parsed_url) - - # file URLs contain "target=file" - if "target=file_" in parsed_url.query: - return IliasElementType.REGULAR_FILE - - # Skip forums - if "cmd=showThreads" in parsed_url.query: - return IliasElementType.FORUM - - # Everything with a ref_id can *probably* be opened to reveal nested things - # video groups, directories, exercises, etc - if "ref_id=" in parsed_url.query: - return IliasCrawler._find_type_from_folder_like(link_element, url) - - PRETTY.warning( - "Got unknown element type in switch. I am not sure what horror I found on the" - f" ILIAS page. The element was at {str(path)!r} and it is {link_element!r})" - ) - return None - - @staticmethod - def _find_type_from_folder_like(link_element: bs4.Tag, url: str) -> Optional[IliasElementType]: - """ - Try crawling something that looks like a folder. - """ - # pylint: disable=too-many-return-statements - - found_parent: Optional[bs4.Tag] = None - - # We look for the outer div of our inner link, to find information around it - # (mostly the icon) - for parent in link_element.parents: - if "ilContainerListItemOuter" in parent["class"]: - found_parent = parent - break - - if found_parent is None: - PRETTY.warning(f"Could not find element icon for {url!r}") - return None - - # Find the small descriptive icon to figure out the type - img_tag: Optional[bs4.Tag] = found_parent.select_one("img.ilListItemIcon") - - if img_tag is None: - PRETTY.warning(f"Could not find image tag for {url!r}") - return None - - if "opencast" in str(img_tag["alt"]).lower(): - return IliasElementType.VIDEO_FOLDER - - if str(img_tag["src"]).endswith("icon_exc.svg"): - return IliasElementType.EXERCISE_FOLDER - - if str(img_tag["src"]).endswith("icon_webr.svg"): - return IliasElementType.EXTERNAL_LINK - - if str(img_tag["src"]).endswith("frm.svg"): - return IliasElementType.FORUM - - if str(img_tag["src"]).endswith("sess.svg"): - return IliasElementType.MEETING - - return IliasElementType.REGULAR_FOLDER - - @staticmethod - def _crawl_file(path: Path, link_element: bs4.Tag, url: str) -> List[IliasCrawlerEntry]: - """ - Crawls a file. - """ - # Files have a list of properties (type, modification date, size, etc.) - # In a series of divs. - # Find the parent containing all those divs, so we can filter our what we need - properties_parent: bs4.Tag = link_element.findParent( - "div", {"class": lambda x: "il_ContainerListItem" in x} - ).select_one(".il_ItemProperties") - # The first one is always the filetype - file_type = properties_parent.select_one("span.il_ItemProperty").getText().strip() - - # The rest does not have a stable order. Grab the whole text and reg-ex the date - # out of it - all_properties_text = properties_parent.getText().strip() - modification_date_match = re.search( - r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)", - all_properties_text - ) - if modification_date_match is None: - modification_date = None - PRETTY.warning(f"Could not extract start date from {all_properties_text!r}") - else: - modification_date_str = modification_date_match.group(1) - modification_date = demangle_date(modification_date_str) - - # Grab the name from the link text - name = _sanitize_path_name(link_element.getText()) - full_path = Path(path, name + "." + file_type) - - return [ - IliasCrawlerEntry(full_path, url, IliasElementType.REGULAR_FILE, modification_date) - ] - - def _crawl_video_directory(self, video_dir_path: Path, url: str) -> List[IliasCrawlerEntry]: - """ - Crawl the video overview site. - """ - initial_soup = self._get_page(url, {}) - - # The page is actually emtpy but contains a much needed token in the link below. - # That token can be used to fetch the *actual* video listing - content_link: bs4.Tag = initial_soup.select_one("#tab_series a") - # Fetch the actual video listing. The given parameters return all videos (max 800) - # in a standalone html page - video_list_soup = self._get_page( - self._abs_url_from_link(content_link), - {"limit": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} - ) - - # If we find a page selected, we probably need to respect pagination - if self._is_paginated_video_page(video_list_soup): - second_stage_url = self._abs_url_from_link(content_link) - - return self._crawl_paginated_video_directory( - video_dir_path, video_list_soup, second_stage_url - ) - - return self._crawl_video_directory_second_stage(video_dir_path, video_list_soup) - - @staticmethod - def _is_paginated_video_page(soup: bs4.BeautifulSoup) -> bool: - return soup.find(id=re.compile(r"tab_page_sel.+")) is not None - - def _crawl_paginated_video_directory( - self, - video_dir_path: Path, - paged_video_list_soup: bs4.BeautifulSoup, - second_stage_url: str - ) -> List[IliasCrawlerEntry]: - LOGGER.info("Found paginated video page, trying 800 elements") - - # Try to find the table id. This can be used to build the query parameter indicating - # you want 800 elements - - table_element: bs4.Tag = paged_video_list_soup.find( - name="table", id=re.compile(r"tbl_xoct_.+") - ) - if table_element is None: - PRETTY.warning( - "Could not increase elements per page (table not found)." - " Some might not be crawled!" - ) - return self._crawl_video_directory_second_stage(video_dir_path, paged_video_list_soup) - - match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"]) - if match is None: - PRETTY.warning( - "Could not increase elements per page (table id not found)." - " Some might not be crawled!" - ) - return self._crawl_video_directory_second_stage(video_dir_path, paged_video_list_soup) - table_id = match.group(1) - - extended_video_page = self._get_page( - second_stage_url, - {f"tbl_xoct_{table_id}_trows": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} - ) - - if self._is_paginated_video_page(extended_video_page): - PRETTY.warning( - "800 elements do not seem to be enough (or I failed to fetch that many)." - " I will miss elements." - ) - - return self._crawl_video_directory_second_stage(video_dir_path, extended_video_page) - - def _crawl_video_directory_second_stage( - self, - video_dir_path: Path, - video_list_soup: bs4.BeautifulSoup - ) -> List[IliasCrawlerEntry]: - """ - Crawls the "second stage" video page. This page contains the actual video urls. - """ - direct_download_links: List[bs4.Tag] = video_list_soup.findAll( - name="a", text=re.compile(r"\s*Download\s*") - ) - - # Video start links are marked with an "Abspielen" link - video_links: List[bs4.Tag] = video_list_soup.findAll( - name="a", text=re.compile(r"\s*Abspielen\s*") - ) - - results: List[IliasCrawlerEntry] = [] - - # We can download everything directly! - # FIXME: Sadly the download button is currently broken, so never do that - if False and len(direct_download_links) == len(video_links): - for link in direct_download_links: - results += self._crawl_single_video(video_dir_path, link, True) - else: - for link in video_links: - results += self._crawl_single_video(video_dir_path, link, False) - - return results - - def _crawl_single_video( - self, - parent_path: Path, - link: bs4.Tag, - direct_download: bool - ) -> List[IliasCrawlerEntry]: - """ - Crawl a single video based on its "Abspielen" link from the video listing. - """ - # The link is part of a table with multiple columns, describing metadata. - # 6th child (1 indexed) is the modification time string - modification_string = link.parent.parent.parent.select_one( - "td.std:nth-child(6)" - ).getText().strip() - modification_time = datetime.datetime.strptime(modification_string, "%d.%m.%Y - %H:%M") - - title = link.parent.parent.parent.select_one( - "td.std:nth-child(3)" - ).getText().strip() - title += ".mp4" - - video_path: Path = Path(parent_path, _sanitize_path_name(title)) - - video_url = self._abs_url_from_link(link) - - # The video had a direct download button we can use instead - if direct_download: - LOGGER.debug("Using direct download for video %r", str(video_path)) - return [IliasCrawlerEntry( - video_path, video_url, IliasElementType.VIDEO_FILE, modification_time - )] - - return [IliasCrawlerEntry( - video_path, - self._crawl_video_url_from_play_link(video_url), - IliasElementType.VIDEO_FILE, - modification_time - )] - - def _crawl_video_url_from_play_link(self, play_url: str) -> Callable[[], Optional[str]]: - def inner() -> Optional[str]: - # Fetch the actual video page. This is a small wrapper page initializing a javscript - # player. Sadly we can not execute that JS. The actual video stream url is nowhere - # on the page, but defined in a JS object inside a script tag, passed to the player - # library. - # We do the impossible and RegEx the stream JSON object out of the page's HTML source - video_page_soup = soupify(self._session.get(play_url)) - regex: re.Pattern = re.compile( - r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE - ) - json_match = regex.search(str(video_page_soup)) - - if json_match is None: - PRETTY.warning(f"Could not find json stream info for {play_url!r}") - return None - json_str = json_match.group(1) - - # parse it - json_object = json.loads(json_str) - # and fetch the video url! - video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"] - return video_url - return inner - - def _crawl_exercises(self, element_path: Path, url: str) -> List[IliasCrawlerEntry]: - """ - Crawl files offered for download in exercises. - """ - soup = self._get_page(url, {}) - - results: List[IliasCrawlerEntry] = [] - - # Each assignment is in an accordion container - assignment_containers: List[bs4.Tag] = soup.select(".il_VAccordionInnerContainer") - - for container in assignment_containers: - # Fetch the container name out of the header to use it in the path - container_name = container.select_one(".ilAssignmentHeader").getText().strip() - # Find all download links in the container (this will contain all the files) - files: List[bs4.Tag] = container.findAll( - name="a", - # download links contain the given command class - attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x}, - text="Download" - ) - - LOGGER.debug("Found exercise container %r", container_name) - - # Grab each file as you now have the link - for file_link in files: - # Two divs, side by side. Left is the name, right is the link ==> get left - # sibling - file_name = file_link.parent.findPrevious(name="div").getText().strip() - file_name = _sanitize_path_name(file_name) - url = self._abs_url_from_link(file_link) - - LOGGER.debug("Found file %r at %r", file_name, url) - - results.append(IliasCrawlerEntry( - Path(element_path, container_name, file_name), - url, - IliasElementType.REGULAR_FILE, - None # We do not have any timestamp - )) - - return results - - @retry_on_io_exception(3, "fetching webpage") - def _get_page(self, url: str, params: Dict[str, Any], - retry_count: int = 0) -> bs4.BeautifulSoup: - """ - Fetches a page from ILIAS, authenticating when needed. - """ - - if retry_count >= 4: - raise FatalException("Could not get a proper page after 4 tries. " - "Maybe your URL is wrong, authentication fails continuously, " - "your ILIAS connection is spotty or ILIAS is not well.") - - LOGGER.debug("Fetching %r", url) - - response = self._session.get(url, params=params) - content_type = response.headers["content-type"] - - if not content_type.startswith("text/html"): - raise FatalException( - f"Invalid content type {content_type} when crawling ilias page" - " {url!r} with {params!r}" - ) - - soup = soupify(response) - - if self._is_logged_in(soup): - return soup - - LOGGER.info("Not authenticated, changing that...") - - self._authenticator.authenticate(self._session) - - return self._get_page(url, params, retry_count + 1) - - @staticmethod - def _is_logged_in(soup: bs4.BeautifulSoup) -> bool: - # Normal ILIAS pages - userlog = soup.find("li", {"id": "userlog"}) - if userlog is not None: - LOGGER.debug("Auth: Found #userlog") - return True - # Video listing embeds do not have complete ILIAS html. Try to match them by - # their video listing table - video_table = soup.find( - recursive=True, - name="table", - attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")} - ) - if video_table is not None: - LOGGER.debug("Auth: Found #tbl_xoct.+") - return True - # The individual video player wrapper page has nothing of the above. - # Match it by its playerContainer. - if soup.select_one("#playerContainer") is not None: - LOGGER.debug("Auth: Found #playerContainer") - return True - return False diff --git a/PFERD/ilias/date_demangler.py b/PFERD/ilias/date_demangler.py deleted file mode 100644 index 2950d4d..0000000 --- a/PFERD/ilias/date_demangler.py +++ /dev/null @@ -1,51 +0,0 @@ -""" -Helper methods to demangle an ILIAS date. -""" - -import datetime -import locale -import logging -import re -from typing import Optional - -from ..logging import PrettyLogger - -LOGGER = logging.getLogger(__name__) -PRETTY = PrettyLogger(LOGGER) - - -def demangle_date(date: str) -> Optional[datetime.datetime]: - """ - Demangle a given date in one of the following formats: - "Gestern, HH:MM" - "Heute, HH:MM" - "Morgen, HH:MM" - "dd. mon yyyy, HH:MM - """ - saved = locale.setlocale(locale.LC_ALL) - try: - try: - locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8') - except locale.Error: - PRETTY.warning( - "Could not set language to german. Assuming you use english everywhere." - ) - - date = re.sub(r"\s+", " ", date) - date = re.sub("Gestern|Yesterday", _yesterday().strftime("%d. %b %Y"), date, re.I) - date = re.sub("Heute|Today", datetime.date.today().strftime("%d. %b %Y"), date, re.I) - date = re.sub("Morgen|Tomorrow", _tomorrow().strftime("%d. %b %Y"), date, re.I) - return datetime.datetime.strptime(date, "%d. %b %Y, %H:%M") - except ValueError: - PRETTY.warning(f"Could not parse date {date!r}") - return None - finally: - locale.setlocale(locale.LC_ALL, saved) - - -def _yesterday() -> datetime.date: - return datetime.date.today() - datetime.timedelta(days=1) - - -def _tomorrow() -> datetime.date: - return datetime.date.today() + datetime.timedelta(days=1) diff --git a/PFERD/ilias/downloader.py b/PFERD/ilias/downloader.py deleted file mode 100644 index f6132bf..0000000 --- a/PFERD/ilias/downloader.py +++ /dev/null @@ -1,173 +0,0 @@ -"""Contains a downloader for ILIAS.""" - -import datetime -import logging -import math -import os -from pathlib import Path, PurePath -from typing import Callable, List, Optional, Union - -import bs4 -import requests - -from ..errors import retry_on_io_exception -from ..logging import PrettyLogger -from ..organizer import Organizer -from ..tmp_dir import TmpDir -from ..transform import Transformable -from ..utils import soupify, stream_to_path -from .authenticators import IliasAuthenticator - -LOGGER = logging.getLogger(__name__) -PRETTY = PrettyLogger(LOGGER) - - -class ContentTypeException(Exception): - """Thrown when the content type of the ilias element can not be handled.""" - - -class IliasDownloadInfo(Transformable): - """ - This class describes a single file to be downloaded. - """ - - def __init__( - self, - path: PurePath, - url: Union[str, Callable[[], Optional[str]]], - modifcation_date: Optional[datetime.datetime] - ): - super().__init__(path) - if isinstance(url, str): - string_url = url - self.url: Callable[[], Optional[str]] = lambda: string_url - else: - self.url = url - self.modification_date = modifcation_date - - -IliasDownloadStrategy = Callable[[Organizer, IliasDownloadInfo], bool] - - -def download_everything(organizer: Organizer, info: IliasDownloadInfo) -> bool: - # pylint: disable=unused-argument - """ - Accepts everything. - """ - return True - - -def download_modified_or_new(organizer: Organizer, info: IliasDownloadInfo) -> bool: - """ - Accepts new files or files with a more recent modification date. - """ - resolved_file = organizer.resolve(info.path) - if not resolved_file.exists() or info.modification_date is None: - return True - resolved_mod_time_seconds = resolved_file.stat().st_mtime - - # Download if the info is newer - if info.modification_date.timestamp() > resolved_mod_time_seconds: - return True - - PRETTY.ignored_file(info.path, "local file has newer or equal modification time") - return False - - -class IliasDownloader: - # pylint: disable=too-many-arguments - """A downloader for ILIAS.""" - - def __init__( - self, - tmp_dir: TmpDir, - organizer: Organizer, - session: requests.Session, - authenticator: IliasAuthenticator, - strategy: IliasDownloadStrategy, - timeout: int = 5 - ): - """ - Create a new IliasDownloader. - - The timeout applies to the download request only, as bwcloud uses IPv6 - and requests has a problem with that: https://github.com/psf/requests/issues/5522 - """ - - self._tmp_dir = tmp_dir - self._organizer = organizer - self._session = session - self._authenticator = authenticator - self._strategy = strategy - self._timeout = timeout - - def download_all(self, infos: List[IliasDownloadInfo]) -> None: - """ - Download multiple files one after the other. - """ - - for info in infos: - self.download(info) - - def download(self, info: IliasDownloadInfo) -> None: - """ - Download a file from ILIAS. - - Retries authentication until eternity if it could not fetch the file. - """ - - LOGGER.debug("Downloading %r", info) - - if not self._strategy(self._organizer, info): - self._organizer.mark(info.path) - return - - tmp_file = self._tmp_dir.new_path() - - @retry_on_io_exception(3, "downloading file") - def download_impl() -> bool: - if not self._try_download(info, tmp_file): - LOGGER.info("Re-Authenticating due to download failure: %r", info) - self._authenticator.authenticate(self._session) - raise IOError("Scheduled retry") - else: - return True - - if not download_impl(): - PRETTY.error(f"Download of file {info.path} failed too often! Skipping it...") - return - - dst_path = self._organizer.accept_file(tmp_file, info.path) - if dst_path and info.modification_date: - os.utime( - dst_path, - times=( - math.ceil(info.modification_date.timestamp()), - math.ceil(info.modification_date.timestamp()) - ) - ) - - def _try_download(self, info: IliasDownloadInfo, target: Path) -> bool: - url = info.url() - if url is None: - PRETTY.warning(f"Could not download {str(info.path)!r} as I got no URL :/") - return True - - with self._session.get(url, stream=True, timeout=self._timeout) as response: - content_type = response.headers["content-type"] - has_content_disposition = "content-disposition" in response.headers - - if content_type.startswith("text/html") and not has_content_disposition: - if self._is_logged_in(soupify(response)): - raise ContentTypeException("Attempting to download a web page, not a file") - - return False - - # Yay, we got the file :) - stream_to_path(response, target, info.path.name) - return True - - @staticmethod - def _is_logged_in(soup: bs4.BeautifulSoup) -> bool: - userlog = soup.find("li", {"id": "userlog"}) - return userlog is not None diff --git a/PFERD/ipd.py b/PFERD/ipd.py deleted file mode 100644 index ece6a97..0000000 --- a/PFERD/ipd.py +++ /dev/null @@ -1,154 +0,0 @@ -""" -Utility functions and a scraper/downloader for the IPD pages. -""" -import datetime -import logging -import math -import os -from dataclasses import dataclass -from pathlib import Path -from typing import Callable, List, Optional -from urllib.parse import urljoin - -import bs4 -import requests - -from PFERD.errors import FatalException -from PFERD.utils import soupify - -from .logging import PrettyLogger -from .organizer import Organizer -from .tmp_dir import TmpDir -from .transform import Transformable -from .utils import stream_to_path - -LOGGER = logging.getLogger(__name__) -PRETTY = PrettyLogger(LOGGER) - - -@dataclass -class IpdDownloadInfo(Transformable): - """ - Information about an ipd entry. - """ - url: str - modification_date: Optional[datetime.datetime] - - -IpdDownloadStrategy = Callable[[Organizer, IpdDownloadInfo], bool] - - -def ipd_download_new_or_modified(organizer: Organizer, info: IpdDownloadInfo) -> bool: - """ - Accepts new files or files with a more recent modification date. - """ - resolved_file = organizer.resolve(info.path) - if not resolved_file.exists(): - return True - if not info.modification_date: - PRETTY.ignored_file(info.path, "could not find modification time, file exists") - return False - - resolved_mod_time_seconds = resolved_file.stat().st_mtime - - # Download if the info is newer - if info.modification_date.timestamp() > resolved_mod_time_seconds: - return True - - PRETTY.ignored_file(info.path, "local file has newer or equal modification time") - return False - - -class IpdCrawler: - # pylint: disable=too-few-public-methods - """ - A crawler for IPD pages. - """ - - def __init__(self, base_url: str): - self._base_url = base_url - - def _abs_url_from_link(self, link_tag: bs4.Tag) -> str: - """ - Create an absolute url from an tag. - """ - return urljoin(self._base_url, link_tag.get("href")) - - def crawl(self) -> List[IpdDownloadInfo]: - """ - Crawls the playlist given in the constructor. - """ - page = soupify(requests.get(self._base_url)) - - items: List[IpdDownloadInfo] = [] - - def is_relevant_url(x: str) -> bool: - return x.endswith(".pdf") or x.endswith(".c") or x.endswith(".java") or x.endswith(".zip") - - for link in page.findAll(name="a", attrs={"href": lambda x: x and is_relevant_url(x)}): - href: str = link.attrs.get("href") - name = href.split("/")[-1] - - modification_date: Optional[datetime.datetime] = None - try: - enclosing_row: bs4.Tag = link.findParent(name="tr") - if enclosing_row: - date_text = enclosing_row.find(name="td").text - modification_date = datetime.datetime.strptime(date_text, "%d.%m.%Y") - except ValueError: - modification_date = None - - items.append(IpdDownloadInfo( - Path(name), - url=self._abs_url_from_link(link), - modification_date=modification_date - )) - - return items - - -class IpdDownloader: - """ - A downloader for ipd files. - """ - - def __init__(self, tmp_dir: TmpDir, organizer: Organizer, strategy: IpdDownloadStrategy): - self._tmp_dir = tmp_dir - self._organizer = organizer - self._strategy = strategy - self._session = requests.session() - - def download_all(self, infos: List[IpdDownloadInfo]) -> None: - """ - Download multiple files one after the other. - """ - for info in infos: - self.download(info) - - def download(self, info: IpdDownloadInfo) -> None: - """ - Download a single file. - """ - if not self._strategy(self._organizer, info): - self._organizer.mark(info.path) - return - - with self._session.get(info.url, stream=True) as response: - if response.status_code == 200: - tmp_file = self._tmp_dir.new_path() - stream_to_path(response, tmp_file, info.path.name) - dst_path = self._organizer.accept_file(tmp_file, info.path) - - if dst_path and info.modification_date: - os.utime( - dst_path, - times=( - math.ceil(info.modification_date.timestamp()), - math.ceil(info.modification_date.timestamp()) - ) - ) - - elif response.status_code == 403: - raise FatalException("Received 403. Are you not using the KIT VPN?") - else: - PRETTY.warning(f"Could not download file, got response {response.status_code}") diff --git a/PFERD/location.py b/PFERD/location.py deleted file mode 100644 index 7f4c8ca..0000000 --- a/PFERD/location.py +++ /dev/null @@ -1,41 +0,0 @@ -""" -Contains a Location class for objects with an inherent path. -""" - -from pathlib import Path, PurePath - - -class ResolveException(Exception): - """An exception while resolving a file.""" - # TODO take care of this when doing exception handling - - -class Location: - """ - An object that has an inherent path. - """ - - def __init__(self, path: Path): - self._path = path.resolve() - - @property - def path(self) -> Path: - """ - This object's location. - """ - - return self._path - - def resolve(self, target: PurePath) -> Path: - """ - Resolve a file relative to the path of this location. - - Raises a [ResolveException] if the file is outside the given directory. - """ - absolute_path = self.path.joinpath(target).resolve() - - # TODO Make this less inefficient - if self.path not in absolute_path.parents: - raise ResolveException(f"Path {target} is not inside directory {self.path}") - - return absolute_path diff --git a/PFERD/logging.py b/PFERD/logging.py deleted file mode 100644 index c25019e..0000000 --- a/PFERD/logging.py +++ /dev/null @@ -1,184 +0,0 @@ -""" -Contains a few logger utility functions and implementations. -""" - -import logging -from typing import Optional - -from rich._log_render import LogRender -from rich.console import Console -from rich.style import Style -from rich.text import Text -from rich.theme import Theme - -from .download_summary import DownloadSummary -from .utils import PathLike, to_path - -STYLE = "{" -FORMAT = "[{levelname:<7}] {message}" -DATE_FORMAT = "%F %T" - - -def enable_logging(name: str = "PFERD", level: int = logging.INFO) -> None: - """ - Enable and configure logging via the logging module. - """ - - logger = logging.getLogger(name) - logger.setLevel(level) - logger.addHandler(RichLoggingHandler(level=level)) - - # This should be logged by our own handler, and not the root logger's - # default handler, so we don't pass it on to the root logger. - logger.propagate = False - - -class RichLoggingHandler(logging.Handler): - """ - A logging handler that uses rich for highlighting - """ - - def __init__(self, level: int) -> None: - super().__init__(level=level) - self.console = Console(theme=Theme({ - "logging.level.warning": Style(color="yellow") - })) - self._log_render = LogRender(show_level=True, show_time=False, show_path=False) - - def emit(self, record: logging.LogRecord) -> None: - """ - Invoked by logging. - """ - log_style = f"logging.level.{record.levelname.lower()}" - message = self.format(record) - - level = Text() - level.append(record.levelname, log_style) - message_text = Text.from_markup(message) - - self.console.print( - self._log_render( - self.console, - [message_text], - level=level, - ) - ) - - -class PrettyLogger: - """ - A logger that prints some specially formatted log messages in color. - """ - - def __init__(self, logger: logging.Logger) -> None: - self.logger = logger - - @staticmethod - def _format_path(path: PathLike) -> str: - return repr(str(to_path(path))) - - def error(self, message: str) -> None: - """ - Print an error message indicating some operation fatally failed. - """ - self.logger.error( - f"[bold red]{message}[/bold red]" - ) - - def warning(self, message: str) -> None: - """ - Print a warning message indicating some operation failed, but the error can be recovered - or ignored. - """ - self.logger.warning( - f"[bold yellow]{message}[/bold yellow]" - ) - - def modified_file(self, path: PathLike) -> None: - """ - An existing file has changed. - """ - - self.logger.info( - f"[bold magenta]Modified {self._format_path(path)}.[/bold magenta]" - ) - - def new_file(self, path: PathLike) -> None: - """ - A new file has been downloaded. - """ - - self.logger.info( - f"[bold green]Created {self._format_path(path)}.[/bold green]" - ) - - def deleted_file(self, path: PathLike) -> None: - """ - A file has been deleted. - """ - - self.logger.info( - f"[bold red]Deleted {self._format_path(path)}.[/bold red]" - ) - - def ignored_file(self, path: PathLike, reason: str) -> None: - """ - File was not downloaded or modified. - """ - - self.logger.info( - f"[dim]Ignored {self._format_path(path)} " - f"([/dim]{reason}[dim]).[/dim]" - ) - - def searching(self, path: PathLike) -> None: - """ - A crawler searches a particular object. - """ - - self.logger.info(f"Searching {self._format_path(path)}") - - def not_searching(self, path: PathLike, reason: str) -> None: - """ - A crawler does not search a particular object. - """ - - self.logger.info( - f"[dim]Not searching {self._format_path(path)} " - f"([/dim]{reason}[dim]).[/dim]" - ) - - def summary(self, download_summary: DownloadSummary) -> None: - """ - Prints a download summary. - """ - self.logger.info("") - self.logger.info("[bold cyan]Download Summary[/bold cyan]") - if not download_summary.has_updates(): - self.logger.info("[bold dim]Nothing changed![/bold dim]") - return - - for new_file in download_summary.new_files: - self.new_file(new_file) - for modified_file in download_summary.modified_files: - self.modified_file(modified_file) - for deleted_files in download_summary.deleted_files: - self.deleted_file(deleted_files) - - def starting_synchronizer( - self, - target_directory: PathLike, - synchronizer_name: str, - subject: Optional[str] = None, - ) -> None: - """ - A special message marking that a synchronizer has been started. - """ - - subject_str = f"{subject} " if subject else "" - self.logger.info("") - self.logger.info(( - f"[bold cyan]Synchronizing " - f"{subject_str}to {self._format_path(target_directory)} " - f"using the {synchronizer_name} synchronizer.[/bold cyan]" - )) diff --git a/PFERD/organizer.py b/PFERD/organizer.py deleted file mode 100644 index fe5052b..0000000 --- a/PFERD/organizer.py +++ /dev/null @@ -1,224 +0,0 @@ -"""A simple helper for managing downloaded files. - -A organizer is bound to a single directory. -""" - -import filecmp -import logging -import os -import shutil -from enum import Enum -from pathlib import Path, PurePath -from typing import Callable, List, Optional, Set - -from .download_summary import DownloadSummary -from .location import Location -from .logging import PrettyLogger -from .utils import prompt_yes_no - -LOGGER = logging.getLogger(__name__) -PRETTY = PrettyLogger(LOGGER) - - -class ConflictType(Enum): - """ - The type of the conflict. A file might not exist anymore and will be deleted - or it might be overwritten with a newer version. - - FILE_OVERWRITTEN: An existing file will be updated - MARKED_FILE_OVERWRITTEN: A file is written for the second+ time in this run - FILE_DELETED: The file was deleted - """ - FILE_OVERWRITTEN = "overwritten" - MARKED_FILE_OVERWRITTEN = "marked_file_overwritten" - FILE_DELETED = "deleted" - - -class FileConflictResolution(Enum): - """ - The reaction when confronted with a file conflict: - - DESTROY_EXISTING: Delete/overwrite the current file - KEEP_EXISTING: Keep the current file - DEFAULT: Do whatever the PFERD authors thought is sensible - PROMPT: Interactively ask the user - """ - - DESTROY_EXISTING = "destroy" - - KEEP_EXISTING = "keep" - - DEFAULT = "default" - - PROMPT = "prompt" - - -FileConflictResolver = Callable[[PurePath, ConflictType], FileConflictResolution] - - -def resolve_prompt_user(_path: PurePath, conflict: ConflictType) -> FileConflictResolution: - """ - Resolves conflicts by asking the user if a file was written twice or will be deleted. - """ - if conflict == ConflictType.FILE_OVERWRITTEN: - return FileConflictResolution.DESTROY_EXISTING - return FileConflictResolution.PROMPT - - -class FileAcceptException(Exception): - """An exception while accepting a file.""" - - -class Organizer(Location): - """A helper for managing downloaded files.""" - - def __init__(self, path: Path, conflict_resolver: FileConflictResolver = resolve_prompt_user): - """Create a new organizer for a given path.""" - super().__init__(path) - self._known_files: Set[Path] = set() - - # Keep the root dir - self._known_files.add(path.resolve()) - - self.download_summary = DownloadSummary() - - self.conflict_resolver = conflict_resolver - - def accept_file(self, src: Path, dst: PurePath) -> Optional[Path]: - """ - Move a file to this organizer and mark it. - - Returns the path the file was moved to, to allow the caller to adjust the metadata. - As you might still need to adjust the metadata when the file was identical - (e.g. update the timestamp), the path is also returned in this case. - In all other cases (ignored, not overwritten, etc.) this method returns None. - """ - # Windows limits the path length to 260 for *some* historical reason - # If you want longer paths, you will have to add the "\\?\" prefix in front of - # your path... - # See: - # https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation - if os.name == 'nt': - src_absolute = Path("\\\\?\\" + str(src.resolve())) - dst_absolute = Path("\\\\?\\" + str(self.resolve(dst))) - else: - src_absolute = src.resolve() - dst_absolute = self.resolve(dst) - - if not src_absolute.exists(): - raise FileAcceptException("Source file does not exist") - - if not src_absolute.is_file(): - raise FileAcceptException("Source is a directory") - - LOGGER.debug("Copying %s to %s", src_absolute, dst_absolute) - - if self._is_marked(dst): - PRETTY.warning(f"File {str(dst_absolute)!r} was already written!") - conflict = ConflictType.MARKED_FILE_OVERWRITTEN - if self._resolve_conflict("Overwrite file?", dst_absolute, conflict, default=False): - PRETTY.ignored_file(dst_absolute, "file was written previously") - return None - - # Destination file is directory - if dst_absolute.exists() and dst_absolute.is_dir(): - prompt = f"Overwrite folder {dst_absolute} with file?" - conflict = ConflictType.FILE_OVERWRITTEN - if self._resolve_conflict(prompt, dst_absolute, conflict, default=False): - shutil.rmtree(dst_absolute) - else: - PRETTY.warning(f"Could not add file {str(dst_absolute)!r}") - return None - - # Destination file exists - if dst_absolute.exists() and dst_absolute.is_file(): - if filecmp.cmp(str(src_absolute), str(dst_absolute), shallow=False): - # Bail out, nothing more to do - PRETTY.ignored_file(dst_absolute, "same file contents") - self.mark(dst) - return dst_absolute - - prompt = f"Overwrite file {dst_absolute}?" - conflict = ConflictType.FILE_OVERWRITTEN - if not self._resolve_conflict(prompt, dst_absolute, conflict, default=True): - PRETTY.ignored_file(dst_absolute, "user conflict resolution") - return None - - self.download_summary.add_modified_file(dst_absolute) - PRETTY.modified_file(dst_absolute) - else: - self.download_summary.add_new_file(dst_absolute) - PRETTY.new_file(dst_absolute) - - # Create parent dir if needed - dst_parent_dir: Path = dst_absolute.parent - dst_parent_dir.mkdir(exist_ok=True, parents=True) - - # Move file - shutil.move(str(src_absolute), str(dst_absolute)) - - self.mark(dst) - - return dst_absolute - - def mark(self, path: PurePath) -> None: - """Mark a file as used so it will not get cleaned up.""" - absolute_path = self.resolve(path) - self._known_files.add(absolute_path) - LOGGER.debug("Tracked %s", absolute_path) - - def _is_marked(self, path: PurePath) -> bool: - """ - Checks whether a file is marked. - """ - absolute_path = self.resolve(path) - return absolute_path in self._known_files - - def cleanup(self) -> None: - """Remove all untracked files in the organizer's dir.""" - LOGGER.debug("Deleting all untracked files...") - - self._cleanup(self.path) - - def _cleanup(self, start_dir: Path) -> None: - if not start_dir.exists(): - return - paths: List[Path] = list(start_dir.iterdir()) - - # Recursively clean paths - for path in paths: - if path.is_dir(): - self._cleanup(path) - else: - if path.resolve() not in self._known_files: - self._delete_file_if_confirmed(path) - - # Delete dir if it was empty and untracked - dir_empty = len(list(start_dir.iterdir())) == 0 - if start_dir.resolve() not in self._known_files and dir_empty: - start_dir.rmdir() - - def _delete_file_if_confirmed(self, path: Path) -> None: - prompt = f"Do you want to delete {path}" - - if self._resolve_conflict(prompt, path, ConflictType.FILE_DELETED, default=False): - self.download_summary.add_deleted_file(path) - path.unlink() - else: - PRETTY.ignored_file(path, "user conflict resolution") - - def _resolve_conflict( - self, prompt: str, path: Path, conflict: ConflictType, default: bool - ) -> bool: - if not self.conflict_resolver: - return prompt_yes_no(prompt, default=default) - - result = self.conflict_resolver(path, conflict) - if result == FileConflictResolution.DEFAULT: - return default - if result == FileConflictResolution.KEEP_EXISTING: - return False - if result == FileConflictResolution.DESTROY_EXISTING: - return True - - return prompt_yes_no(prompt, default=default) diff --git a/PFERD/progress.py b/PFERD/progress.py deleted file mode 100644 index 6ad098f..0000000 --- a/PFERD/progress.py +++ /dev/null @@ -1,111 +0,0 @@ -""" -A small progress bar implementation. -""" -import sys -from dataclasses import dataclass -from types import TracebackType -from typing import Optional, Type - -import requests -from rich.console import Console -from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID, - TextColumn, TimeRemainingColumn, - TransferSpeedColumn) - -_progress: Progress = Progress( - TextColumn("[bold blue]{task.fields[name]}", justify="right"), - BarColumn(bar_width=None), - "[progress.percentage]{task.percentage:>3.1f}%", - "•", - DownloadColumn(), - "•", - TransferSpeedColumn(), - "•", - TimeRemainingColumn(), - console=Console(file=sys.stdout), - transient=True -) - - -def size_from_headers(response: requests.Response) -> Optional[int]: - """ - Return the size of the download based on the response headers. - - Arguments: - response {requests.Response} -- the response - - Returns: - Optional[int] -- the size - """ - if "Content-Length" in response.headers: - return int(response.headers["Content-Length"]) - return None - - -@dataclass -class ProgressSettings: - """ - Settings you can pass to customize the progress bar. - """ - name: str - max_size: int - - -def progress_for(settings: Optional[ProgressSettings]) -> 'ProgressContextManager': - """ - Returns a context manager that displays progress - - Returns: - ProgressContextManager -- the progress manager - """ - return ProgressContextManager(settings) - - -class ProgressContextManager: - """ - A context manager used for displaying progress. - """ - - def __init__(self, settings: Optional[ProgressSettings]): - self._settings = settings - self._task_id: Optional[TaskID] = None - - def __enter__(self) -> 'ProgressContextManager': - """Context manager entry function.""" - if not self._settings: - return self - - _progress.start() - self._task_id = _progress.add_task( - self._settings.name, - total=self._settings.max_size, - name=self._settings.name - ) - return self - - # pylint: disable=useless-return - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_value: Optional[BaseException], - traceback: Optional[TracebackType], - ) -> Optional[bool]: - """Context manager exit function. Removes the task.""" - if self._task_id is None: - return None - - _progress.remove_task(self._task_id) - - if len(_progress.task_ids) == 0: - # We need to clean up after ourselves, as we were the last one - _progress.stop() - _progress.refresh() - - return None - - def advance(self, amount: float) -> None: - """ - Advances the progress bar. - """ - if self._task_id is not None: - _progress.advance(self._task_id, amount) diff --git a/PFERD/tmp_dir.py b/PFERD/tmp_dir.py deleted file mode 100644 index 51ade2d..0000000 --- a/PFERD/tmp_dir.py +++ /dev/null @@ -1,79 +0,0 @@ -"""Helper functions and classes for temporary folders.""" - -import logging -import shutil -from pathlib import Path -from types import TracebackType -from typing import Optional, Type - -from .location import Location - -LOGGER = logging.getLogger(__name__) - - -class TmpDir(Location): - """A temporary folder that can create files or nested temp folders.""" - - def __init__(self, path: Path): - """Create a new temporary folder for the given path.""" - super().__init__(path) - self._counter = 0 - self.cleanup() - self.path.mkdir(parents=True, exist_ok=True) - - def __str__(self) -> str: - """Format the folder as a string.""" - return f"Folder at {self.path}" - - def __enter__(self) -> 'TmpDir': - """Context manager entry function.""" - return self - - # pylint: disable=useless-return - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_value: Optional[BaseException], - traceback: Optional[TracebackType], - ) -> Optional[bool]: - """Context manager exit function. Calls cleanup().""" - self.cleanup() - return None - - def new_path(self, prefix: Optional[str] = None) -> Path: - """ - Return a unique path inside the directory. Doesn't create a file or - directory. - """ - - name = f"{prefix if prefix else 'tmp'}-{self._inc_and_get_counter():03}" - - LOGGER.debug("Creating temp file %s", name) - - return self.resolve(Path(name)) - - def new_subdir(self, prefix: Optional[str] = None) -> 'TmpDir': - """ - Create a new nested temporary folder and return it. - """ - - name = f"{prefix if prefix else 'tmp'}-{self._inc_and_get_counter():03}" - sub_path = self.resolve(Path(name)) - sub_path.mkdir(parents=True) - - LOGGER.debug("Creating temp dir %s at %s", name, sub_path) - - return TmpDir(sub_path) - - def cleanup(self) -> None: - """Delete this folder and all contained files.""" - LOGGER.debug("Deleting temp folder %s", self.path) - - if self.path.resolve().exists(): - shutil.rmtree(self.path.resolve()) - - def _inc_and_get_counter(self) -> int: - """Get and increment the counter by one.""" - counter = self._counter - self._counter += 1 - return counter diff --git a/PFERD/transform.py b/PFERD/transform.py deleted file mode 100644 index a2152ba..0000000 --- a/PFERD/transform.py +++ /dev/null @@ -1,142 +0,0 @@ -""" -Transforms let the user define functions to decide where the downloaded files -should be placed locally. They let the user do more advanced things like moving -only files whose names match a regex, or renaming files from one numbering -scheme to another. -""" - -import os -import re -from dataclasses import dataclass -from pathlib import PurePath -from typing import Callable, List, Optional, TypeVar - -from .utils import PathLike, Regex, to_path, to_pattern - -Transform = Callable[[PurePath], Optional[PurePath]] - - -@dataclass -class Transformable: - """ - An object that can be transformed by a Transform. - """ - - path: PurePath - - -TF = TypeVar("TF", bound=Transformable) - - -def apply_transform( - transform: Transform, - transformables: List[TF], -) -> List[TF]: - """ - Apply a Transform to multiple Transformables, discarding those that were - not transformed by the Transform. - """ - - result: List[TF] = [] - for transformable in transformables: - new_path = transform(transformable.path) - if new_path: - transformable.path = new_path - result.append(transformable) - return result - -# Transform combinators - -def keep(path: PurePath) -> Optional[PurePath]: - return path - -def attempt(*args: Transform) -> Transform: - def inner(path: PurePath) -> Optional[PurePath]: - for transform in args: - result = transform(path) - if result: - return result - return None - return inner - -def optionally(transform: Transform) -> Transform: - return attempt(transform, lambda path: path) - -def do(*args: Transform) -> Transform: - def inner(path: PurePath) -> Optional[PurePath]: - current = path - for transform in args: - result = transform(current) - if result: - current = result - else: - return None - return current - return inner - -def predicate(pred: Callable[[PurePath], bool]) -> Transform: - def inner(path: PurePath) -> Optional[PurePath]: - if pred(path): - return path - return None - return inner - -def glob(pattern: str) -> Transform: - return predicate(lambda path: path.match(pattern)) - -def move_dir(source_dir: PathLike, target_dir: PathLike) -> Transform: - source_path = to_path(source_dir) - target_path = to_path(target_dir) - def inner(path: PurePath) -> Optional[PurePath]: - if source_path in path.parents: - return target_path / path.relative_to(source_path) - return None - return inner - -def move(source: PathLike, target: PathLike) -> Transform: - source_path = to_path(source) - target_path = to_path(target) - def inner(path: PurePath) -> Optional[PurePath]: - if path == source_path: - return target_path - return None - return inner - -def rename(source: str, target: str) -> Transform: - def inner(path: PurePath) -> Optional[PurePath]: - if path.name == source: - return path.with_name(target) - return None - return inner - -def re_move(regex: Regex, target: str) -> Transform: - def inner(path: PurePath) -> Optional[PurePath]: - match = to_pattern(regex).fullmatch(str(path)) - if match: - groups = [match.group(0)] - groups.extend(match.groups()) - return PurePath(target.format(*groups)) - return None - return inner - -def re_rename(regex: Regex, target: str) -> Transform: - def inner(path: PurePath) -> Optional[PurePath]: - match = to_pattern(regex).fullmatch(path.name) - if match: - groups = [match.group(0)] - groups.extend(match.groups()) - return path.with_name(target.format(*groups)) - return None - return inner - - -def sanitize_windows_path(path: PurePath) -> PurePath: - """ - A small function to escape characters that are forbidden in windows path names. - This method is a no-op on other operating systems. - """ - # Escape windows illegal path characters - if os.name == 'nt': - sanitized_parts = [re.sub(r'[<>:"/|?]', "_", x) for x in list(path.parts)] - return PurePath(*sanitized_parts) - return path