Delete old files

I should've done this earlier
This commit is contained in:
Joscha 2021-05-14 00:20:59 +02:00
parent 6e5fdf4e9e
commit a673ab0fae
18 changed files with 0 additions and 2647 deletions

View File

@ -1,214 +0,0 @@
"""
General authenticators useful in many situations
"""
import getpass
import logging
from typing import Optional, Tuple
from .logging import PrettyLogger
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
try:
import keyring
except ImportError:
pass
class TfaAuthenticator:
# pylint: disable=too-few-public-methods
"""
An authenticator for a TFA token. Always prompts the user, as the token can not be cached.
"""
def __init__(self, reason: str):
"""
Create a new tfa authenticator.
Arguments:
reason {str} -- the reason for obtaining the credentials
"""
self._reason = reason
def get_token(self) -> str:
# pylint: disable=no-self-use
"""
Prompts the user for the token and returns it.
"""
print(f"Enter credentials ({self._reason})")
return getpass.getpass("TFA Token: ")
class UserPassAuthenticator:
"""
An authenticator for username-password combinations that prompts the user
for missing information.
"""
def __init__(
self,
reason: str,
username: Optional[str] = None,
password: Optional[str] = None,
) -> None:
"""
reason - what the credentials are used for
username - the username (if already known)
password - the password (if already known)
"""
self._reason = reason
self._given_username = username
self._given_password = password
self._username = username
self._password = password
def get_credentials(self) -> Tuple[str, str]:
"""
Returns a tuple (username, password). Prompts user for username or
password when necessary.
"""
if self._username is None and self._given_username is not None:
self._username = self._given_username
if self._password is None and self._given_password is not None:
self._password = self._given_password
if self._username is None or self._password is None:
print(f"Enter credentials ({self._reason})")
username: str
if self._username is None:
username = input("Username: ")
self._username = username
else:
username = self._username
password: str
if self._password is None:
password = getpass.getpass(prompt="Password: ")
self._password = password
else:
password = self._password
return (username, password)
@property
def username(self) -> str:
"""
The username. Accessing this property may cause the authenticator to
prompt the user.
"""
(username, _) = self.get_credentials()
return username
@property
def password(self) -> str:
"""
The password. Accessing this property may cause the authenticator to
prompt the user.
"""
(_, password) = self.get_credentials()
return password
def invalidate_credentials(self) -> None:
"""
Marks the credentials as invalid. If only a username was supplied in
the constructor, assumes that the username is valid and only the
password is invalid. If only a password was supplied in the
constructor, assumes that the password is valid and only the username
is invalid. Otherwise, assumes that username and password are both
invalid.
"""
self._username = None
self._password = None
if self._given_username is not None and self._given_password is not None:
self._given_username = None
self._given_password = None
class KeyringAuthenticator(UserPassAuthenticator):
"""
An authenticator for username-password combinations that stores the
password using the system keyring service and prompts the user for missing
information.
"""
def get_credentials(self) -> Tuple[str, str]:
"""
Returns a tuple (username, password). Prompts user for username or
password when necessary.
"""
if self._username is None and self._given_username is not None:
self._username = self._given_username
if self._password is None and self._given_password is not None:
self._password = self._given_password
if self._username is not None and self._password is None:
self._load_password()
if self._username is None or self._password is None:
print(f"Enter credentials ({self._reason})")
username: str
if self._username is None:
username = input("Username: ")
self._username = username
else:
username = self._username
if self._password is None:
self._load_password()
password: str
if self._password is None:
password = getpass.getpass(prompt="Password: ")
self._password = password
self._save_password()
else:
password = self._password
return (username, password)
def _load_password(self) -> None:
"""
Loads the saved password associated with self._username from the system
keyring service (or None if not password has been saved yet) and stores
it in self._password.
"""
self._password = keyring.get_password("pferd-ilias", self._username)
def _save_password(self) -> None:
"""
Saves self._password to the system keyring service and associates it
with self._username.
"""
keyring.set_password("pferd-ilias", self._username, self._password)
def invalidate_credentials(self) -> None:
"""
Marks the credentials as invalid. If only a username was supplied in
the constructor, assumes that the username is valid and only the
password is invalid. If only a password was supplied in the
constructor, assumes that the password is valid and only the username
is invalid. Otherwise, assumes that username and password are both
invalid.
"""
try:
keyring.delete_password("pferd-ilias", self._username)
except keyring.errors.PasswordDeleteError:
pass
super().invalidate_credentials()

View File

@ -1,69 +0,0 @@
"""A helper for requests cookies."""
import logging
from http.cookiejar import LoadError, LWPCookieJar
from pathlib import Path
from typing import Optional
import requests
LOGGER = logging.getLogger(__name__)
class CookieJar:
"""A cookie jar that can be persisted."""
def __init__(self, cookie_file: Optional[Path] = None) -> None:
"""Create a new cookie jar at the given path.
If the path is None, the cookies will not be persisted.
"""
self._cookies: LWPCookieJar
if cookie_file is None:
self._cookies = LWPCookieJar()
else:
self._cookies = LWPCookieJar(str(cookie_file.resolve()))
@property
def cookies(self) -> LWPCookieJar:
"""Return the requests cookie jar."""
return self._cookies
def load_cookies(self) -> None:
"""Load all cookies from the file given in the constructor."""
if self._cookies.filename is None:
return
try:
LOGGER.info("Loading old cookies from %s", self._cookies.filename)
self._cookies.load(ignore_discard=True)
except (FileNotFoundError, LoadError):
LOGGER.warning(
"No valid cookie file found at %s, continuing with no cookies",
self._cookies.filename
)
def save_cookies(self, reason: Optional[str] = None) -> None:
"""Save the cookies in the file given in the constructor."""
if self._cookies.filename is None:
return
if reason is None:
LOGGER.info("Saving cookies")
else:
LOGGER.info("Saving cookies (%s)", reason)
# TODO figure out why ignore_discard is set
# TODO possibly catch a few more exceptions
self._cookies.save(ignore_discard=True)
def create_session(self) -> requests.Session:
"""Create a new session using the cookie jar."""
sess = requests.Session()
# From the request docs: "All requests code should work out of the box
# with externally provided instances of CookieJar, e.g. LWPCookieJar
# and FileCookieJar."
sess.cookies = self.cookies # type: ignore
return sess

View File

@ -1,169 +0,0 @@
"""
Utility functions and a scraper/downloader for the KIT DIVA portal.
"""
import logging
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, List, Optional
import requests
from .errors import FatalException
from .logging import PrettyLogger
from .organizer import Organizer
from .tmp_dir import TmpDir
from .transform import Transformable
from .utils import stream_to_path
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
@dataclass
class DivaDownloadInfo(Transformable):
"""
Information about a DIVA video
"""
url: str
DivaDownloadStrategy = Callable[[Organizer, DivaDownloadInfo], bool]
def diva_download_new(organizer: Organizer, info: DivaDownloadInfo) -> bool:
"""
Accepts only new files.
"""
resolved_file = organizer.resolve(info.path)
if not resolved_file.exists():
return True
PRETTY.ignored_file(info.path, "local file exists")
return False
class DivaPlaylistCrawler:
# pylint: disable=too-few-public-methods
"""
A crawler for DIVA playlists.
"""
_PLAYLIST_BASE_URL = "https://mediaservice.bibliothek.kit.edu/asset/detail/"
_COLLECTION_BASE_URL = "https://mediaservice.bibliothek.kit.edu/asset/collection.json"
def __init__(self, playlist_id: str):
self._id = playlist_id
@classmethod
def fetch_id(cls, playlist_link: str) -> str:
"""
Fetches the ID for a playerlist, given the base link
(e.g. https://mediaservice.bibliothek.kit.edu/#/details/DIVA-2019-271).
Raises a FatalException, if the id can not be resolved
"""
match = re.match(r".+#/details/(.+)", playlist_link)
if match is None:
raise FatalException(
"DIVA: Invalid playlist link format, could not extract details."
)
base_name = match.group(1)
response = requests.get(cls._PLAYLIST_BASE_URL + base_name + ".json")
if response.status_code != 200:
raise FatalException(
f"DIVA: Got non-200 status code ({response.status_code}))"
f"when requesting {response.url!r}!"
)
body = response.json()
if body["error"]:
raise FatalException(f"DIVA: Server returned error {body['error']!r}.")
return body["result"]["collection"]["id"]
def crawl(self) -> List[DivaDownloadInfo]:
"""
Crawls the playlist given in the constructor.
"""
response = requests.get(self._COLLECTION_BASE_URL, params={"collection": self._id})
if response.status_code != 200:
raise FatalException(f"Server returned status {response.status_code}.")
body = response.json()
if body["error"]:
raise FatalException(f"Server returned error {body['error']!r}.")
result = body["result"]
if result["resultCount"] > result["pageSize"]:
PRETTY.warning("Did not receive all results, some will be missing")
download_infos: List[DivaDownloadInfo] = []
for video in result["resultList"]:
title = video["title"]
collection_title = self._follow_path(["collection", "title"], video)
url = self._follow_path(
["resourceList", "derivateList", "mp4", "url"],
video
)
if url and collection_title and title:
path = Path(collection_title, title + ".mp4")
download_infos.append(DivaDownloadInfo(path, url))
else:
PRETTY.warning(f"Incomplete video found: {title!r} {collection_title!r} {url!r}")
return download_infos
@staticmethod
def _follow_path(path: List[str], obj: Any) -> Optional[Any]:
"""
Follows a property path through an object, bailing at the first None.
"""
current = obj
for path_step in path:
if path_step in current:
current = current[path_step]
else:
return None
return current
class DivaDownloader:
"""
A downloader for DIVA videos.
"""
def __init__(self, tmp_dir: TmpDir, organizer: Organizer, strategy: DivaDownloadStrategy):
self._tmp_dir = tmp_dir
self._organizer = organizer
self._strategy = strategy
self._session = requests.session()
def download_all(self, infos: List[DivaDownloadInfo]) -> None:
"""
Download multiple files one after the other.
"""
for info in infos:
self.download(info)
def download(self, info: DivaDownloadInfo) -> None:
"""
Download a single file.
"""
if not self._strategy(self._organizer, info):
self._organizer.mark(info.path)
return
with self._session.get(info.url, stream=True) as response:
if response.status_code == 200:
tmp_file = self._tmp_dir.new_path()
stream_to_path(response, tmp_file, info.path.name)
self._organizer.accept_file(tmp_file, info.path)
else:
PRETTY.warning(f"Could not download file, got response {response.status_code}")

View File

@ -1,75 +0,0 @@
"""
Provides a summary that keeps track of new modified or deleted files.
"""
from pathlib import Path
from typing import List
def _mergeNoDuplicate(first: List[Path], second: List[Path]) -> List[Path]:
tmp = list(set(first + second))
tmp.sort(key=lambda x: str(x.resolve()))
return tmp
class DownloadSummary:
"""
Keeps track of all new, modified or deleted files and provides a summary.
"""
def __init__(self) -> None:
self._new_files: List[Path] = []
self._modified_files: List[Path] = []
self._deleted_files: List[Path] = []
@property
def new_files(self) -> List[Path]:
"""
Returns all new files.
"""
return self._new_files.copy()
@property
def modified_files(self) -> List[Path]:
"""
Returns all modified files.
"""
return self._modified_files.copy()
@property
def deleted_files(self) -> List[Path]:
"""
Returns all deleted files.
"""
return self._deleted_files.copy()
def merge(self, summary: 'DownloadSummary') -> None:
"""
Merges ourselves with the passed summary. Modifies this object, but not the passed one.
"""
self._new_files = _mergeNoDuplicate(self._new_files, summary.new_files)
self._modified_files = _mergeNoDuplicate(self._modified_files, summary.modified_files)
self._deleted_files = _mergeNoDuplicate(self._deleted_files, summary.deleted_files)
def add_deleted_file(self, path: Path) -> None:
"""
Registers a file as deleted.
"""
self._deleted_files.append(path)
def add_modified_file(self, path: Path) -> None:
"""
Registers a file as changed.
"""
self._modified_files.append(path)
def add_new_file(self, path: Path) -> None:
"""
Registers a file as new.
"""
self._new_files.append(path)
def has_updates(self) -> bool:
"""
Returns whether this summary has any updates.
"""
return bool(self._new_files or self._modified_files or self._deleted_files)

View File

@ -1,72 +0,0 @@
"""
General downloaders useful in many situations
"""
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import requests
import requests.auth
from .organizer import Organizer
from .tmp_dir import TmpDir
from .transform import Transformable
from .utils import stream_to_path
@dataclass
class HttpDownloadInfo(Transformable):
"""
This class describes a single file to be downloaded.
"""
url: str
parameters: Dict[str, Any] = field(default_factory=dict)
class HttpDownloader:
"""A HTTP downloader that can handle HTTP basic auth."""
def __init__(
self,
tmp_dir: TmpDir,
organizer: Organizer,
username: Optional[str],
password: Optional[str],
):
"""Create a new http downloader."""
self._organizer = organizer
self._tmp_dir = tmp_dir
self._username = username
self._password = password
self._session = self._build_session()
def _build_session(self) -> requests.Session:
session = requests.Session()
if self._username and self._password:
session.auth = requests.auth.HTTPBasicAuth(
self._username, self._password
)
return session
def download_all(self, infos: List[HttpDownloadInfo]) -> None:
"""
Download multiple files one after the other.
"""
for info in infos:
self.download(info)
def download(self, info: HttpDownloadInfo) -> None:
"""
Download a single file.
"""
with self._session.get(info.url, params=info.parameters, stream=True) as response:
if response.status_code == 200:
tmp_file = self._tmp_dir.new_path()
stream_to_path(response, tmp_file, info.path.name)
self._organizer.accept_file(tmp_file, info.path)
else:
# TODO use proper exception
raise Exception(f"Could not download file, got response {response.status_code}")

View File

@ -1,57 +0,0 @@
"""
An error logging decorator.
"""
import logging
from typing import Any, Callable, TypeVar, cast
from rich.console import Console
from .logging import PrettyLogger
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class FatalException(Exception):
"""
A fatal exception occurred. Recovery is not possible.
"""
TFun = TypeVar('TFun', bound=Callable[..., Any])
def swallow_and_print_errors(function: TFun) -> TFun:
"""
Decorates a function, swallows all errors, logs them and returns none if one occurred.
"""
def inner(*args: Any, **kwargs: Any) -> Any:
# pylint: disable=broad-except
try:
return function(*args, **kwargs)
except FatalException as error:
PRETTY.error(str(error))
return None
except Exception as error:
Console().print_exception()
return None
return cast(TFun, inner)
def retry_on_io_exception(max_retries: int, message: str) -> Callable[[TFun], TFun]:
"""
Decorates a function and retries it on any exception until the max retries count is hit.
"""
def retry(function: TFun) -> TFun:
def inner(*args: Any, **kwargs: Any) -> Any:
for i in range(0, max_retries):
# pylint: disable=broad-except
try:
return function(*args, **kwargs)
except IOError as error:
PRETTY.warning(f"Error duing operation '{message}': {error}")
PRETTY.warning(
f"Retrying operation '{message}'. Remaining retries: {max_retries - 1 - i}")
return cast(TFun, inner)
return retry

View File

@ -1,10 +0,0 @@
"""
Synchronizing files from ILIAS instances (https://www.ilias.de/).
"""
from .authenticators import IliasAuthenticator, KitShibbolethAuthenticator
from .crawler import (IliasCrawler, IliasCrawlerEntry, IliasDirectoryFilter,
IliasElementType)
from .downloader import (IliasDownloader, IliasDownloadInfo,
IliasDownloadStrategy, download_everything,
download_modified_or_new)

View File

@ -1,138 +0,0 @@
"""
Authenticators that can obtain proper ILIAS session cookies.
"""
import abc
import logging
from typing import Optional
import bs4
import requests
from ..authenticators import TfaAuthenticator, UserPassAuthenticator
from ..utils import soupify
LOGGER = logging.getLogger(__name__)
class IliasAuthenticator(abc.ABC):
# pylint: disable=too-few-public-methods
"""
An authenticator that logs an existing requests session into an ILIAS
account.
"""
@abc.abstractmethod
def authenticate(self, sess: requests.Session) -> None:
"""
Log a requests session into this authenticator's ILIAS account.
"""
class KitShibbolethAuthenticator(IliasAuthenticator):
# pylint: disable=too-few-public-methods
"""
Authenticate via KIT's shibboleth system.
"""
def __init__(self, authenticator: Optional[UserPassAuthenticator] = None) -> None:
if authenticator:
self._auth = authenticator
else:
self._auth = UserPassAuthenticator("KIT ILIAS Shibboleth")
self._tfa_auth = TfaAuthenticator("KIT ILIAS Shibboleth")
def authenticate(self, sess: requests.Session) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Mit KIT-Account anmelden" button in
# https://ilias.studium.kit.edu/login.php
LOGGER.debug("Begin authentication process with ILIAS")
url = "https://ilias.studium.kit.edu/Shibboleth.sso/Login"
data = {
"sendLogin": "1",
"idp_selection": "https://idp.scc.kit.edu/idp/shibboleth",
"target": "/shib_login.php",
"home_organization_selection": "Mit KIT-Account anmelden",
}
soup = soupify(sess.post(url, data=data))
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"class": "full content", "method": "post"})
action = form["action"]
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
LOGGER.debug("Attempt to log in to Shibboleth using credentials")
url = "https://idp.scc.kit.edu" + action
data = {
"_eventId_proceed": "",
"j_username": self._auth.username,
"j_password": self._auth.password,
"csrf_token": csrf_token
}
soup = soupify(sess.post(url, data=data))
if self._tfa_required(soup):
soup = self._authenticate_tfa(sess, soup)
if not self._login_successful(soup):
print("Incorrect credentials.")
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
LOGGER.debug("Redirect back to ILIAS with login information")
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
url = "https://ilias.studium.kit.edu/Shibboleth.sso/SAML2/POST"
data = { # using the info obtained in the while loop above
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
sess.post(url, data=data)
def _authenticate_tfa(
self,
session: requests.Session,
soup: bs4.BeautifulSoup
) -> bs4.BeautifulSoup:
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"method": "post"})
action = form["action"]
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
LOGGER.debug("Attempt to log in to Shibboleth with TFA token")
url = "https://idp.scc.kit.edu" + action
data = {
"_eventId_proceed": "",
"j_tokenNumber": self._tfa_auth.get_token()
}
return soupify(session.post(url, data=data))
@staticmethod
def _login_successful(soup: bs4.BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: bs4.BeautifulSoup) -> bool:
return soup.find(id="j_tokenNumber") is not None

View File

@ -1,684 +0,0 @@
"""
Contains an ILIAS crawler alongside helper functions.
"""
import datetime
import json
import logging
import re
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import (parse_qs, urlencode, urljoin, urlparse, urlsplit,
urlunsplit)
import bs4
import requests
from ..errors import FatalException, retry_on_io_exception
from ..logging import PrettyLogger
from ..utils import soupify
from .authenticators import IliasAuthenticator
from .date_demangler import demangle_date
from .downloader import IliasDownloadInfo
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
def _sanitize_path_name(name: str) -> str:
return name.replace("/", "-").replace("\\", "-")
class IliasElementType(Enum):
"""
The type of an ilias element.
"""
REGULAR_FOLDER = "REGULAR_FOLDER"
VIDEO_FOLDER = "VIDEO_FOLDER"
EXERCISE_FOLDER = "EXERCISE_FOLDER"
REGULAR_FILE = "REGULAR_FILE"
VIDEO_FILE = "VIDEO_FILE"
FORUM = "FORUM"
MEETING = "MEETING"
EXTERNAL_LINK = "EXTERNAL_LINK"
def is_folder(self) -> bool:
"""
Returns whether this type is some kind of folder.
"""
return "FOLDER" in str(self.name)
IliasDirectoryFilter = Callable[[Path, IliasElementType], bool]
class IliasCrawlerEntry:
# pylint: disable=too-few-public-methods
"""
An ILIAS crawler entry used internally to find, catalogue and recursively crawl elements.
"""
def __init__(
self,
path: Path,
url: Union[str, Callable[[], Optional[str]]],
entry_type: IliasElementType,
modification_date: Optional[datetime.datetime]
):
self.path = path
if isinstance(url, str):
str_url = url
self.url: Callable[[], Optional[str]] = lambda: str_url
else:
self.url = url
self.entry_type = entry_type
self.modification_date = modification_date
def to_download_info(self) -> Optional[IliasDownloadInfo]:
"""
Converts this crawler entry to an IliasDownloadInfo, if possible.
This method will only succeed for *File* types.
"""
if self.entry_type in [IliasElementType.REGULAR_FILE, IliasElementType.VIDEO_FILE]:
return IliasDownloadInfo(self.path, self.url, self.modification_date)
return None
class IliasCrawler:
# pylint: disable=too-few-public-methods
"""
A crawler for ILIAS.
"""
# pylint: disable=too-many-arguments
def __init__(
self,
base_url: str,
session: requests.Session,
authenticator: IliasAuthenticator,
dir_filter: IliasDirectoryFilter
):
"""
Create a new ILIAS crawler.
"""
self._base_url = base_url
self._session = session
self._authenticator = authenticator
self.dir_filter = dir_filter
@staticmethod
def _url_set_query_param(url: str, param: str, value: str) -> str:
"""
Set a query parameter in an url, overwriting existing ones with the same name.
"""
scheme, netloc, path, query, fragment = urlsplit(url)
query_parameters = parse_qs(query)
query_parameters[param] = [value]
new_query_string = urlencode(query_parameters, doseq=True)
return urlunsplit((scheme, netloc, path, new_query_string, fragment))
def recursive_crawl_url(self, url: str) -> List[IliasDownloadInfo]:
"""
Crawls a given url *and all reachable elements in it*.
Args:
url {str} -- the *full* url to crawl
"""
start_entries: List[IliasCrawlerEntry] = self._crawl_folder(Path(""), url)
return self._iterate_entries_to_download_infos(start_entries)
def crawl_course(self, course_id: str) -> List[IliasDownloadInfo]:
"""
Starts the crawl process for a course, yielding a list of elements to (potentially)
download.
Arguments:
course_id {str} -- the course id
Raises:
FatalException: if an unrecoverable error occurs or the course id is not valid
"""
# Start crawling at the given course
root_url = self._url_set_query_param(
self._base_url + "/goto.php", "target", f"crs_{course_id}"
)
if not self._is_course_id_valid(root_url, course_id):
raise FatalException(
"Invalid course id? I didn't find anything looking like a course!"
)
# And treat it as a folder
entries: List[IliasCrawlerEntry] = self._crawl_folder(Path(""), root_url)
return self._iterate_entries_to_download_infos(entries)
def _is_course_id_valid(self, root_url: str, course_id: str) -> bool:
response: requests.Response = self._session.get(root_url)
# We were redirected ==> Non-existant ID
if course_id not in response.url:
return False
link_element: bs4.Tag = self._get_page(root_url, {}).find(id="current_perma_link")
if not link_element:
return False
# It wasn't a course but a category list, forum, etc.
return "crs_" in link_element.get("value")
def find_course_name(self, course_id: str) -> Optional[str]:
"""
Returns the name of a given course. None if it is not a valid course
or it could not be found.
"""
course_url = self._url_set_query_param(
self._base_url + "/goto.php", "target", f"crs_{course_id}"
)
return self.find_element_name(course_url)
def find_element_name(self, url: str) -> Optional[str]:
"""
Returns the name of the element at the given URL, if it can find one.
"""
focus_element: bs4.Tag = self._get_page(url, {}).find(id="il_mhead_t_focus")
if not focus_element:
return None
return focus_element.text
def crawl_personal_desktop(self) -> List[IliasDownloadInfo]:
"""
Crawls the ILIAS personal desktop (and every subelements that can be reached from there).
Raises:
FatalException: if an unrecoverable error occurs
"""
entries: List[IliasCrawlerEntry] = self._crawl_folder(
Path(""), self._base_url + "?baseClass=ilPersonalDesktopGUI"
)
return self._iterate_entries_to_download_infos(entries)
def _iterate_entries_to_download_infos(
self,
entries: List[IliasCrawlerEntry]
) -> List[IliasDownloadInfo]:
result: List[IliasDownloadInfo] = []
entries_to_process: List[IliasCrawlerEntry] = entries.copy()
while len(entries_to_process) > 0:
entry = entries_to_process.pop()
if entry.entry_type == IliasElementType.EXTERNAL_LINK:
PRETTY.not_searching(entry.path, "external link")
continue
if entry.entry_type == IliasElementType.FORUM:
PRETTY.not_searching(entry.path, "forum")
continue
if entry.entry_type.is_folder() and not self.dir_filter(entry.path, entry.entry_type):
PRETTY.not_searching(entry.path, "user filter")
continue
download_info = entry.to_download_info()
if download_info is not None:
result.append(download_info)
continue
url = entry.url()
if url is None:
PRETTY.warning(f"Could not find url for {str(entry.path)!r}, skipping it")
continue
PRETTY.searching(entry.path)
if entry.entry_type == IliasElementType.EXERCISE_FOLDER:
entries_to_process += self._crawl_exercises(entry.path, url)
continue
if entry.entry_type == IliasElementType.REGULAR_FOLDER:
entries_to_process += self._crawl_folder(entry.path, url)
continue
if entry.entry_type == IliasElementType.VIDEO_FOLDER:
entries_to_process += self._crawl_video_directory(entry.path, url)
continue
PRETTY.warning(f"Unknown type: {entry.entry_type}!")
return result
def _crawl_folder(self, folder_path: Path, url: str) -> List[IliasCrawlerEntry]:
"""
Crawl all files in a folder-like element.
"""
soup = self._get_page(url, {})
if soup.find(id="headerimage"):
element: bs4.Tag = soup.find(id="headerimage")
if "opencast" in element.attrs["src"].lower():
PRETTY.warning(f"Switched to crawling a video at {folder_path}")
if not self.dir_filter(folder_path, IliasElementType.VIDEO_FOLDER):
PRETTY.not_searching(folder_path, "user filter")
return []
return self._crawl_video_directory(folder_path, url)
result: List[IliasCrawlerEntry] = []
# Fetch all links and throw them to the general interpreter
links: List[bs4.Tag] = soup.select("a.il_ContainerItemTitle")
for link in links:
abs_url = self._abs_url_from_link(link)
element_path = Path(folder_path, _sanitize_path_name(link.getText().strip()))
element_type = self._find_type_from_link(element_path, link, abs_url)
if element_type == IliasElementType.REGULAR_FILE:
result += self._crawl_file(folder_path, link, abs_url)
elif element_type == IliasElementType.MEETING:
meeting_name = str(element_path.name)
date_portion_str = meeting_name.split(" - ")[0]
date_portion = demangle_date(date_portion_str)
if not date_portion:
result += [IliasCrawlerEntry(element_path, abs_url, element_type, None)]
continue
rest_of_name = meeting_name
if rest_of_name.startswith(date_portion_str):
rest_of_name = rest_of_name[len(date_portion_str):]
new_name = datetime.datetime.strftime(date_portion, "%Y-%m-%d, %H:%M") \
+ rest_of_name
new_path = Path(folder_path, _sanitize_path_name(new_name))
result += [
IliasCrawlerEntry(new_path, abs_url, IliasElementType.REGULAR_FOLDER, None)
]
elif element_type is not None:
result += [IliasCrawlerEntry(element_path, abs_url, element_type, None)]
else:
PRETTY.warning(f"Found element without a type at {str(element_path)!r}")
return result
def _abs_url_from_link(self, link_tag: bs4.Tag) -> str:
"""
Create an absolute url from an <a> tag.
"""
return urljoin(self._base_url, link_tag.get("href"))
@staticmethod
def _find_type_from_link(
path: Path,
link_element: bs4.Tag,
url: str
) -> Optional[IliasElementType]:
"""
Decides which sub crawler to use for a given top level element.
"""
parsed_url = urlparse(url)
LOGGER.debug("Parsed url: %r", parsed_url)
# file URLs contain "target=file"
if "target=file_" in parsed_url.query:
return IliasElementType.REGULAR_FILE
# Skip forums
if "cmd=showThreads" in parsed_url.query:
return IliasElementType.FORUM
# Everything with a ref_id can *probably* be opened to reveal nested things
# video groups, directories, exercises, etc
if "ref_id=" in parsed_url.query:
return IliasCrawler._find_type_from_folder_like(link_element, url)
PRETTY.warning(
"Got unknown element type in switch. I am not sure what horror I found on the"
f" ILIAS page. The element was at {str(path)!r} and it is {link_element!r})"
)
return None
@staticmethod
def _find_type_from_folder_like(link_element: bs4.Tag, url: str) -> Optional[IliasElementType]:
"""
Try crawling something that looks like a folder.
"""
# pylint: disable=too-many-return-statements
found_parent: Optional[bs4.Tag] = None
# We look for the outer div of our inner link, to find information around it
# (mostly the icon)
for parent in link_element.parents:
if "ilContainerListItemOuter" in parent["class"]:
found_parent = parent
break
if found_parent is None:
PRETTY.warning(f"Could not find element icon for {url!r}")
return None
# Find the small descriptive icon to figure out the type
img_tag: Optional[bs4.Tag] = found_parent.select_one("img.ilListItemIcon")
if img_tag is None:
PRETTY.warning(f"Could not find image tag for {url!r}")
return None
if "opencast" in str(img_tag["alt"]).lower():
return IliasElementType.VIDEO_FOLDER
if str(img_tag["src"]).endswith("icon_exc.svg"):
return IliasElementType.EXERCISE_FOLDER
if str(img_tag["src"]).endswith("icon_webr.svg"):
return IliasElementType.EXTERNAL_LINK
if str(img_tag["src"]).endswith("frm.svg"):
return IliasElementType.FORUM
if str(img_tag["src"]).endswith("sess.svg"):
return IliasElementType.MEETING
return IliasElementType.REGULAR_FOLDER
@staticmethod
def _crawl_file(path: Path, link_element: bs4.Tag, url: str) -> List[IliasCrawlerEntry]:
"""
Crawls a file.
"""
# Files have a list of properties (type, modification date, size, etc.)
# In a series of divs.
# Find the parent containing all those divs, so we can filter our what we need
properties_parent: bs4.Tag = link_element.findParent(
"div", {"class": lambda x: "il_ContainerListItem" in x}
).select_one(".il_ItemProperties")
# The first one is always the filetype
file_type = properties_parent.select_one("span.il_ItemProperty").getText().strip()
# The rest does not have a stable order. Grab the whole text and reg-ex the date
# out of it
all_properties_text = properties_parent.getText().strip()
modification_date_match = re.search(
r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)",
all_properties_text
)
if modification_date_match is None:
modification_date = None
PRETTY.warning(f"Could not extract start date from {all_properties_text!r}")
else:
modification_date_str = modification_date_match.group(1)
modification_date = demangle_date(modification_date_str)
# Grab the name from the link text
name = _sanitize_path_name(link_element.getText())
full_path = Path(path, name + "." + file_type)
return [
IliasCrawlerEntry(full_path, url, IliasElementType.REGULAR_FILE, modification_date)
]
def _crawl_video_directory(self, video_dir_path: Path, url: str) -> List[IliasCrawlerEntry]:
"""
Crawl the video overview site.
"""
initial_soup = self._get_page(url, {})
# The page is actually emtpy but contains a much needed token in the link below.
# That token can be used to fetch the *actual* video listing
content_link: bs4.Tag = initial_soup.select_one("#tab_series a")
# Fetch the actual video listing. The given parameters return all videos (max 800)
# in a standalone html page
video_list_soup = self._get_page(
self._abs_url_from_link(content_link),
{"limit": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
)
# If we find a page selected, we probably need to respect pagination
if self._is_paginated_video_page(video_list_soup):
second_stage_url = self._abs_url_from_link(content_link)
return self._crawl_paginated_video_directory(
video_dir_path, video_list_soup, second_stage_url
)
return self._crawl_video_directory_second_stage(video_dir_path, video_list_soup)
@staticmethod
def _is_paginated_video_page(soup: bs4.BeautifulSoup) -> bool:
return soup.find(id=re.compile(r"tab_page_sel.+")) is not None
def _crawl_paginated_video_directory(
self,
video_dir_path: Path,
paged_video_list_soup: bs4.BeautifulSoup,
second_stage_url: str
) -> List[IliasCrawlerEntry]:
LOGGER.info("Found paginated video page, trying 800 elements")
# Try to find the table id. This can be used to build the query parameter indicating
# you want 800 elements
table_element: bs4.Tag = paged_video_list_soup.find(
name="table", id=re.compile(r"tbl_xoct_.+")
)
if table_element is None:
PRETTY.warning(
"Could not increase elements per page (table not found)."
" Some might not be crawled!"
)
return self._crawl_video_directory_second_stage(video_dir_path, paged_video_list_soup)
match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"])
if match is None:
PRETTY.warning(
"Could not increase elements per page (table id not found)."
" Some might not be crawled!"
)
return self._crawl_video_directory_second_stage(video_dir_path, paged_video_list_soup)
table_id = match.group(1)
extended_video_page = self._get_page(
second_stage_url,
{f"tbl_xoct_{table_id}_trows": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
)
if self._is_paginated_video_page(extended_video_page):
PRETTY.warning(
"800 elements do not seem to be enough (or I failed to fetch that many)."
" I will miss elements."
)
return self._crawl_video_directory_second_stage(video_dir_path, extended_video_page)
def _crawl_video_directory_second_stage(
self,
video_dir_path: Path,
video_list_soup: bs4.BeautifulSoup
) -> List[IliasCrawlerEntry]:
"""
Crawls the "second stage" video page. This page contains the actual video urls.
"""
direct_download_links: List[bs4.Tag] = video_list_soup.findAll(
name="a", text=re.compile(r"\s*Download\s*")
)
# Video start links are marked with an "Abspielen" link
video_links: List[bs4.Tag] = video_list_soup.findAll(
name="a", text=re.compile(r"\s*Abspielen\s*")
)
results: List[IliasCrawlerEntry] = []
# We can download everything directly!
# FIXME: Sadly the download button is currently broken, so never do that
if False and len(direct_download_links) == len(video_links):
for link in direct_download_links:
results += self._crawl_single_video(video_dir_path, link, True)
else:
for link in video_links:
results += self._crawl_single_video(video_dir_path, link, False)
return results
def _crawl_single_video(
self,
parent_path: Path,
link: bs4.Tag,
direct_download: bool
) -> List[IliasCrawlerEntry]:
"""
Crawl a single video based on its "Abspielen" link from the video listing.
"""
# The link is part of a table with multiple columns, describing metadata.
# 6th child (1 indexed) is the modification time string
modification_string = link.parent.parent.parent.select_one(
"td.std:nth-child(6)"
).getText().strip()
modification_time = datetime.datetime.strptime(modification_string, "%d.%m.%Y - %H:%M")
title = link.parent.parent.parent.select_one(
"td.std:nth-child(3)"
).getText().strip()
title += ".mp4"
video_path: Path = Path(parent_path, _sanitize_path_name(title))
video_url = self._abs_url_from_link(link)
# The video had a direct download button we can use instead
if direct_download:
LOGGER.debug("Using direct download for video %r", str(video_path))
return [IliasCrawlerEntry(
video_path, video_url, IliasElementType.VIDEO_FILE, modification_time
)]
return [IliasCrawlerEntry(
video_path,
self._crawl_video_url_from_play_link(video_url),
IliasElementType.VIDEO_FILE,
modification_time
)]
def _crawl_video_url_from_play_link(self, play_url: str) -> Callable[[], Optional[str]]:
def inner() -> Optional[str]:
# Fetch the actual video page. This is a small wrapper page initializing a javscript
# player. Sadly we can not execute that JS. The actual video stream url is nowhere
# on the page, but defined in a JS object inside a script tag, passed to the player
# library.
# We do the impossible and RegEx the stream JSON object out of the page's HTML source
video_page_soup = soupify(self._session.get(play_url))
regex: re.Pattern = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
)
json_match = regex.search(str(video_page_soup))
if json_match is None:
PRETTY.warning(f"Could not find json stream info for {play_url!r}")
return None
json_str = json_match.group(1)
# parse it
json_object = json.loads(json_str)
# and fetch the video url!
video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"]
return video_url
return inner
def _crawl_exercises(self, element_path: Path, url: str) -> List[IliasCrawlerEntry]:
"""
Crawl files offered for download in exercises.
"""
soup = self._get_page(url, {})
results: List[IliasCrawlerEntry] = []
# Each assignment is in an accordion container
assignment_containers: List[bs4.Tag] = soup.select(".il_VAccordionInnerContainer")
for container in assignment_containers:
# Fetch the container name out of the header to use it in the path
container_name = container.select_one(".ilAssignmentHeader").getText().strip()
# Find all download links in the container (this will contain all the files)
files: List[bs4.Tag] = container.findAll(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x},
text="Download"
)
LOGGER.debug("Found exercise container %r", container_name)
# Grab each file as you now have the link
for file_link in files:
# Two divs, side by side. Left is the name, right is the link ==> get left
# sibling
file_name = file_link.parent.findPrevious(name="div").getText().strip()
file_name = _sanitize_path_name(file_name)
url = self._abs_url_from_link(file_link)
LOGGER.debug("Found file %r at %r", file_name, url)
results.append(IliasCrawlerEntry(
Path(element_path, container_name, file_name),
url,
IliasElementType.REGULAR_FILE,
None # We do not have any timestamp
))
return results
@retry_on_io_exception(3, "fetching webpage")
def _get_page(self, url: str, params: Dict[str, Any],
retry_count: int = 0) -> bs4.BeautifulSoup:
"""
Fetches a page from ILIAS, authenticating when needed.
"""
if retry_count >= 4:
raise FatalException("Could not get a proper page after 4 tries. "
"Maybe your URL is wrong, authentication fails continuously, "
"your ILIAS connection is spotty or ILIAS is not well.")
LOGGER.debug("Fetching %r", url)
response = self._session.get(url, params=params)
content_type = response.headers["content-type"]
if not content_type.startswith("text/html"):
raise FatalException(
f"Invalid content type {content_type} when crawling ilias page"
" {url!r} with {params!r}"
)
soup = soupify(response)
if self._is_logged_in(soup):
return soup
LOGGER.info("Not authenticated, changing that...")
self._authenticator.authenticate(self._session)
return self._get_page(url, params, retry_count + 1)
@staticmethod
def _is_logged_in(soup: bs4.BeautifulSoup) -> bool:
# Normal ILIAS pages
userlog = soup.find("li", {"id": "userlog"})
if userlog is not None:
LOGGER.debug("Auth: Found #userlog")
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
LOGGER.debug("Auth: Found #tbl_xoct.+")
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
LOGGER.debug("Auth: Found #playerContainer")
return True
return False

View File

@ -1,51 +0,0 @@
"""
Helper methods to demangle an ILIAS date.
"""
import datetime
import locale
import logging
import re
from typing import Optional
from ..logging import PrettyLogger
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
def demangle_date(date: str) -> Optional[datetime.datetime]:
"""
Demangle a given date in one of the following formats:
"Gestern, HH:MM"
"Heute, HH:MM"
"Morgen, HH:MM"
"dd. mon yyyy, HH:MM
"""
saved = locale.setlocale(locale.LC_ALL)
try:
try:
locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8')
except locale.Error:
PRETTY.warning(
"Could not set language to german. Assuming you use english everywhere."
)
date = re.sub(r"\s+", " ", date)
date = re.sub("Gestern|Yesterday", _yesterday().strftime("%d. %b %Y"), date, re.I)
date = re.sub("Heute|Today", datetime.date.today().strftime("%d. %b %Y"), date, re.I)
date = re.sub("Morgen|Tomorrow", _tomorrow().strftime("%d. %b %Y"), date, re.I)
return datetime.datetime.strptime(date, "%d. %b %Y, %H:%M")
except ValueError:
PRETTY.warning(f"Could not parse date {date!r}")
return None
finally:
locale.setlocale(locale.LC_ALL, saved)
def _yesterday() -> datetime.date:
return datetime.date.today() - datetime.timedelta(days=1)
def _tomorrow() -> datetime.date:
return datetime.date.today() + datetime.timedelta(days=1)

View File

@ -1,173 +0,0 @@
"""Contains a downloader for ILIAS."""
import datetime
import logging
import math
import os
from pathlib import Path, PurePath
from typing import Callable, List, Optional, Union
import bs4
import requests
from ..errors import retry_on_io_exception
from ..logging import PrettyLogger
from ..organizer import Organizer
from ..tmp_dir import TmpDir
from ..transform import Transformable
from ..utils import soupify, stream_to_path
from .authenticators import IliasAuthenticator
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class ContentTypeException(Exception):
"""Thrown when the content type of the ilias element can not be handled."""
class IliasDownloadInfo(Transformable):
"""
This class describes a single file to be downloaded.
"""
def __init__(
self,
path: PurePath,
url: Union[str, Callable[[], Optional[str]]],
modifcation_date: Optional[datetime.datetime]
):
super().__init__(path)
if isinstance(url, str):
string_url = url
self.url: Callable[[], Optional[str]] = lambda: string_url
else:
self.url = url
self.modification_date = modifcation_date
IliasDownloadStrategy = Callable[[Organizer, IliasDownloadInfo], bool]
def download_everything(organizer: Organizer, info: IliasDownloadInfo) -> bool:
# pylint: disable=unused-argument
"""
Accepts everything.
"""
return True
def download_modified_or_new(organizer: Organizer, info: IliasDownloadInfo) -> bool:
"""
Accepts new files or files with a more recent modification date.
"""
resolved_file = organizer.resolve(info.path)
if not resolved_file.exists() or info.modification_date is None:
return True
resolved_mod_time_seconds = resolved_file.stat().st_mtime
# Download if the info is newer
if info.modification_date.timestamp() > resolved_mod_time_seconds:
return True
PRETTY.ignored_file(info.path, "local file has newer or equal modification time")
return False
class IliasDownloader:
# pylint: disable=too-many-arguments
"""A downloader for ILIAS."""
def __init__(
self,
tmp_dir: TmpDir,
organizer: Organizer,
session: requests.Session,
authenticator: IliasAuthenticator,
strategy: IliasDownloadStrategy,
timeout: int = 5
):
"""
Create a new IliasDownloader.
The timeout applies to the download request only, as bwcloud uses IPv6
and requests has a problem with that: https://github.com/psf/requests/issues/5522
"""
self._tmp_dir = tmp_dir
self._organizer = organizer
self._session = session
self._authenticator = authenticator
self._strategy = strategy
self._timeout = timeout
def download_all(self, infos: List[IliasDownloadInfo]) -> None:
"""
Download multiple files one after the other.
"""
for info in infos:
self.download(info)
def download(self, info: IliasDownloadInfo) -> None:
"""
Download a file from ILIAS.
Retries authentication until eternity if it could not fetch the file.
"""
LOGGER.debug("Downloading %r", info)
if not self._strategy(self._organizer, info):
self._organizer.mark(info.path)
return
tmp_file = self._tmp_dir.new_path()
@retry_on_io_exception(3, "downloading file")
def download_impl() -> bool:
if not self._try_download(info, tmp_file):
LOGGER.info("Re-Authenticating due to download failure: %r", info)
self._authenticator.authenticate(self._session)
raise IOError("Scheduled retry")
else:
return True
if not download_impl():
PRETTY.error(f"Download of file {info.path} failed too often! Skipping it...")
return
dst_path = self._organizer.accept_file(tmp_file, info.path)
if dst_path and info.modification_date:
os.utime(
dst_path,
times=(
math.ceil(info.modification_date.timestamp()),
math.ceil(info.modification_date.timestamp())
)
)
def _try_download(self, info: IliasDownloadInfo, target: Path) -> bool:
url = info.url()
if url is None:
PRETTY.warning(f"Could not download {str(info.path)!r} as I got no URL :/")
return True
with self._session.get(url, stream=True, timeout=self._timeout) as response:
content_type = response.headers["content-type"]
has_content_disposition = "content-disposition" in response.headers
if content_type.startswith("text/html") and not has_content_disposition:
if self._is_logged_in(soupify(response)):
raise ContentTypeException("Attempting to download a web page, not a file")
return False
# Yay, we got the file :)
stream_to_path(response, target, info.path.name)
return True
@staticmethod
def _is_logged_in(soup: bs4.BeautifulSoup) -> bool:
userlog = soup.find("li", {"id": "userlog"})
return userlog is not None

View File

@ -1,154 +0,0 @@
"""
Utility functions and a scraper/downloader for the IPD pages.
"""
import datetime
import logging
import math
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, List, Optional
from urllib.parse import urljoin
import bs4
import requests
from PFERD.errors import FatalException
from PFERD.utils import soupify
from .logging import PrettyLogger
from .organizer import Organizer
from .tmp_dir import TmpDir
from .transform import Transformable
from .utils import stream_to_path
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
@dataclass
class IpdDownloadInfo(Transformable):
"""
Information about an ipd entry.
"""
url: str
modification_date: Optional[datetime.datetime]
IpdDownloadStrategy = Callable[[Organizer, IpdDownloadInfo], bool]
def ipd_download_new_or_modified(organizer: Organizer, info: IpdDownloadInfo) -> bool:
"""
Accepts new files or files with a more recent modification date.
"""
resolved_file = organizer.resolve(info.path)
if not resolved_file.exists():
return True
if not info.modification_date:
PRETTY.ignored_file(info.path, "could not find modification time, file exists")
return False
resolved_mod_time_seconds = resolved_file.stat().st_mtime
# Download if the info is newer
if info.modification_date.timestamp() > resolved_mod_time_seconds:
return True
PRETTY.ignored_file(info.path, "local file has newer or equal modification time")
return False
class IpdCrawler:
# pylint: disable=too-few-public-methods
"""
A crawler for IPD pages.
"""
def __init__(self, base_url: str):
self._base_url = base_url
def _abs_url_from_link(self, link_tag: bs4.Tag) -> str:
"""
Create an absolute url from an <a> tag.
"""
return urljoin(self._base_url, link_tag.get("href"))
def crawl(self) -> List[IpdDownloadInfo]:
"""
Crawls the playlist given in the constructor.
"""
page = soupify(requests.get(self._base_url))
items: List[IpdDownloadInfo] = []
def is_relevant_url(x: str) -> bool:
return x.endswith(".pdf") or x.endswith(".c") or x.endswith(".java") or x.endswith(".zip")
for link in page.findAll(name="a", attrs={"href": lambda x: x and is_relevant_url(x)}):
href: str = link.attrs.get("href")
name = href.split("/")[-1]
modification_date: Optional[datetime.datetime] = None
try:
enclosing_row: bs4.Tag = link.findParent(name="tr")
if enclosing_row:
date_text = enclosing_row.find(name="td").text
modification_date = datetime.datetime.strptime(date_text, "%d.%m.%Y")
except ValueError:
modification_date = None
items.append(IpdDownloadInfo(
Path(name),
url=self._abs_url_from_link(link),
modification_date=modification_date
))
return items
class IpdDownloader:
"""
A downloader for ipd files.
"""
def __init__(self, tmp_dir: TmpDir, organizer: Organizer, strategy: IpdDownloadStrategy):
self._tmp_dir = tmp_dir
self._organizer = organizer
self._strategy = strategy
self._session = requests.session()
def download_all(self, infos: List[IpdDownloadInfo]) -> None:
"""
Download multiple files one after the other.
"""
for info in infos:
self.download(info)
def download(self, info: IpdDownloadInfo) -> None:
"""
Download a single file.
"""
if not self._strategy(self._organizer, info):
self._organizer.mark(info.path)
return
with self._session.get(info.url, stream=True) as response:
if response.status_code == 200:
tmp_file = self._tmp_dir.new_path()
stream_to_path(response, tmp_file, info.path.name)
dst_path = self._organizer.accept_file(tmp_file, info.path)
if dst_path and info.modification_date:
os.utime(
dst_path,
times=(
math.ceil(info.modification_date.timestamp()),
math.ceil(info.modification_date.timestamp())
)
)
elif response.status_code == 403:
raise FatalException("Received 403. Are you not using the KIT VPN?")
else:
PRETTY.warning(f"Could not download file, got response {response.status_code}")

View File

@ -1,41 +0,0 @@
"""
Contains a Location class for objects with an inherent path.
"""
from pathlib import Path, PurePath
class ResolveException(Exception):
"""An exception while resolving a file."""
# TODO take care of this when doing exception handling
class Location:
"""
An object that has an inherent path.
"""
def __init__(self, path: Path):
self._path = path.resolve()
@property
def path(self) -> Path:
"""
This object's location.
"""
return self._path
def resolve(self, target: PurePath) -> Path:
"""
Resolve a file relative to the path of this location.
Raises a [ResolveException] if the file is outside the given directory.
"""
absolute_path = self.path.joinpath(target).resolve()
# TODO Make this less inefficient
if self.path not in absolute_path.parents:
raise ResolveException(f"Path {target} is not inside directory {self.path}")
return absolute_path

View File

@ -1,184 +0,0 @@
"""
Contains a few logger utility functions and implementations.
"""
import logging
from typing import Optional
from rich._log_render import LogRender
from rich.console import Console
from rich.style import Style
from rich.text import Text
from rich.theme import Theme
from .download_summary import DownloadSummary
from .utils import PathLike, to_path
STYLE = "{"
FORMAT = "[{levelname:<7}] {message}"
DATE_FORMAT = "%F %T"
def enable_logging(name: str = "PFERD", level: int = logging.INFO) -> None:
"""
Enable and configure logging via the logging module.
"""
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(RichLoggingHandler(level=level))
# This should be logged by our own handler, and not the root logger's
# default handler, so we don't pass it on to the root logger.
logger.propagate = False
class RichLoggingHandler(logging.Handler):
"""
A logging handler that uses rich for highlighting
"""
def __init__(self, level: int) -> None:
super().__init__(level=level)
self.console = Console(theme=Theme({
"logging.level.warning": Style(color="yellow")
}))
self._log_render = LogRender(show_level=True, show_time=False, show_path=False)
def emit(self, record: logging.LogRecord) -> None:
"""
Invoked by logging.
"""
log_style = f"logging.level.{record.levelname.lower()}"
message = self.format(record)
level = Text()
level.append(record.levelname, log_style)
message_text = Text.from_markup(message)
self.console.print(
self._log_render(
self.console,
[message_text],
level=level,
)
)
class PrettyLogger:
"""
A logger that prints some specially formatted log messages in color.
"""
def __init__(self, logger: logging.Logger) -> None:
self.logger = logger
@staticmethod
def _format_path(path: PathLike) -> str:
return repr(str(to_path(path)))
def error(self, message: str) -> None:
"""
Print an error message indicating some operation fatally failed.
"""
self.logger.error(
f"[bold red]{message}[/bold red]"
)
def warning(self, message: str) -> None:
"""
Print a warning message indicating some operation failed, but the error can be recovered
or ignored.
"""
self.logger.warning(
f"[bold yellow]{message}[/bold yellow]"
)
def modified_file(self, path: PathLike) -> None:
"""
An existing file has changed.
"""
self.logger.info(
f"[bold magenta]Modified {self._format_path(path)}.[/bold magenta]"
)
def new_file(self, path: PathLike) -> None:
"""
A new file has been downloaded.
"""
self.logger.info(
f"[bold green]Created {self._format_path(path)}.[/bold green]"
)
def deleted_file(self, path: PathLike) -> None:
"""
A file has been deleted.
"""
self.logger.info(
f"[bold red]Deleted {self._format_path(path)}.[/bold red]"
)
def ignored_file(self, path: PathLike, reason: str) -> None:
"""
File was not downloaded or modified.
"""
self.logger.info(
f"[dim]Ignored {self._format_path(path)} "
f"([/dim]{reason}[dim]).[/dim]"
)
def searching(self, path: PathLike) -> None:
"""
A crawler searches a particular object.
"""
self.logger.info(f"Searching {self._format_path(path)}")
def not_searching(self, path: PathLike, reason: str) -> None:
"""
A crawler does not search a particular object.
"""
self.logger.info(
f"[dim]Not searching {self._format_path(path)} "
f"([/dim]{reason}[dim]).[/dim]"
)
def summary(self, download_summary: DownloadSummary) -> None:
"""
Prints a download summary.
"""
self.logger.info("")
self.logger.info("[bold cyan]Download Summary[/bold cyan]")
if not download_summary.has_updates():
self.logger.info("[bold dim]Nothing changed![/bold dim]")
return
for new_file in download_summary.new_files:
self.new_file(new_file)
for modified_file in download_summary.modified_files:
self.modified_file(modified_file)
for deleted_files in download_summary.deleted_files:
self.deleted_file(deleted_files)
def starting_synchronizer(
self,
target_directory: PathLike,
synchronizer_name: str,
subject: Optional[str] = None,
) -> None:
"""
A special message marking that a synchronizer has been started.
"""
subject_str = f"{subject} " if subject else ""
self.logger.info("")
self.logger.info((
f"[bold cyan]Synchronizing "
f"{subject_str}to {self._format_path(target_directory)} "
f"using the {synchronizer_name} synchronizer.[/bold cyan]"
))

View File

@ -1,224 +0,0 @@
"""A simple helper for managing downloaded files.
A organizer is bound to a single directory.
"""
import filecmp
import logging
import os
import shutil
from enum import Enum
from pathlib import Path, PurePath
from typing import Callable, List, Optional, Set
from .download_summary import DownloadSummary
from .location import Location
from .logging import PrettyLogger
from .utils import prompt_yes_no
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class ConflictType(Enum):
"""
The type of the conflict. A file might not exist anymore and will be deleted
or it might be overwritten with a newer version.
FILE_OVERWRITTEN: An existing file will be updated
MARKED_FILE_OVERWRITTEN: A file is written for the second+ time in this run
FILE_DELETED: The file was deleted
"""
FILE_OVERWRITTEN = "overwritten"
MARKED_FILE_OVERWRITTEN = "marked_file_overwritten"
FILE_DELETED = "deleted"
class FileConflictResolution(Enum):
"""
The reaction when confronted with a file conflict:
DESTROY_EXISTING: Delete/overwrite the current file
KEEP_EXISTING: Keep the current file
DEFAULT: Do whatever the PFERD authors thought is sensible
PROMPT: Interactively ask the user
"""
DESTROY_EXISTING = "destroy"
KEEP_EXISTING = "keep"
DEFAULT = "default"
PROMPT = "prompt"
FileConflictResolver = Callable[[PurePath, ConflictType], FileConflictResolution]
def resolve_prompt_user(_path: PurePath, conflict: ConflictType) -> FileConflictResolution:
"""
Resolves conflicts by asking the user if a file was written twice or will be deleted.
"""
if conflict == ConflictType.FILE_OVERWRITTEN:
return FileConflictResolution.DESTROY_EXISTING
return FileConflictResolution.PROMPT
class FileAcceptException(Exception):
"""An exception while accepting a file."""
class Organizer(Location):
"""A helper for managing downloaded files."""
def __init__(self, path: Path, conflict_resolver: FileConflictResolver = resolve_prompt_user):
"""Create a new organizer for a given path."""
super().__init__(path)
self._known_files: Set[Path] = set()
# Keep the root dir
self._known_files.add(path.resolve())
self.download_summary = DownloadSummary()
self.conflict_resolver = conflict_resolver
def accept_file(self, src: Path, dst: PurePath) -> Optional[Path]:
"""
Move a file to this organizer and mark it.
Returns the path the file was moved to, to allow the caller to adjust the metadata.
As you might still need to adjust the metadata when the file was identical
(e.g. update the timestamp), the path is also returned in this case.
In all other cases (ignored, not overwritten, etc.) this method returns None.
"""
# Windows limits the path length to 260 for *some* historical reason
# If you want longer paths, you will have to add the "\\?\" prefix in front of
# your path...
# See:
# https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation
if os.name == 'nt':
src_absolute = Path("\\\\?\\" + str(src.resolve()))
dst_absolute = Path("\\\\?\\" + str(self.resolve(dst)))
else:
src_absolute = src.resolve()
dst_absolute = self.resolve(dst)
if not src_absolute.exists():
raise FileAcceptException("Source file does not exist")
if not src_absolute.is_file():
raise FileAcceptException("Source is a directory")
LOGGER.debug("Copying %s to %s", src_absolute, dst_absolute)
if self._is_marked(dst):
PRETTY.warning(f"File {str(dst_absolute)!r} was already written!")
conflict = ConflictType.MARKED_FILE_OVERWRITTEN
if self._resolve_conflict("Overwrite file?", dst_absolute, conflict, default=False):
PRETTY.ignored_file(dst_absolute, "file was written previously")
return None
# Destination file is directory
if dst_absolute.exists() and dst_absolute.is_dir():
prompt = f"Overwrite folder {dst_absolute} with file?"
conflict = ConflictType.FILE_OVERWRITTEN
if self._resolve_conflict(prompt, dst_absolute, conflict, default=False):
shutil.rmtree(dst_absolute)
else:
PRETTY.warning(f"Could not add file {str(dst_absolute)!r}")
return None
# Destination file exists
if dst_absolute.exists() and dst_absolute.is_file():
if filecmp.cmp(str(src_absolute), str(dst_absolute), shallow=False):
# Bail out, nothing more to do
PRETTY.ignored_file(dst_absolute, "same file contents")
self.mark(dst)
return dst_absolute
prompt = f"Overwrite file {dst_absolute}?"
conflict = ConflictType.FILE_OVERWRITTEN
if not self._resolve_conflict(prompt, dst_absolute, conflict, default=True):
PRETTY.ignored_file(dst_absolute, "user conflict resolution")
return None
self.download_summary.add_modified_file(dst_absolute)
PRETTY.modified_file(dst_absolute)
else:
self.download_summary.add_new_file(dst_absolute)
PRETTY.new_file(dst_absolute)
# Create parent dir if needed
dst_parent_dir: Path = dst_absolute.parent
dst_parent_dir.mkdir(exist_ok=True, parents=True)
# Move file
shutil.move(str(src_absolute), str(dst_absolute))
self.mark(dst)
return dst_absolute
def mark(self, path: PurePath) -> None:
"""Mark a file as used so it will not get cleaned up."""
absolute_path = self.resolve(path)
self._known_files.add(absolute_path)
LOGGER.debug("Tracked %s", absolute_path)
def _is_marked(self, path: PurePath) -> bool:
"""
Checks whether a file is marked.
"""
absolute_path = self.resolve(path)
return absolute_path in self._known_files
def cleanup(self) -> None:
"""Remove all untracked files in the organizer's dir."""
LOGGER.debug("Deleting all untracked files...")
self._cleanup(self.path)
def _cleanup(self, start_dir: Path) -> None:
if not start_dir.exists():
return
paths: List[Path] = list(start_dir.iterdir())
# Recursively clean paths
for path in paths:
if path.is_dir():
self._cleanup(path)
else:
if path.resolve() not in self._known_files:
self._delete_file_if_confirmed(path)
# Delete dir if it was empty and untracked
dir_empty = len(list(start_dir.iterdir())) == 0
if start_dir.resolve() not in self._known_files and dir_empty:
start_dir.rmdir()
def _delete_file_if_confirmed(self, path: Path) -> None:
prompt = f"Do you want to delete {path}"
if self._resolve_conflict(prompt, path, ConflictType.FILE_DELETED, default=False):
self.download_summary.add_deleted_file(path)
path.unlink()
else:
PRETTY.ignored_file(path, "user conflict resolution")
def _resolve_conflict(
self, prompt: str, path: Path, conflict: ConflictType, default: bool
) -> bool:
if not self.conflict_resolver:
return prompt_yes_no(prompt, default=default)
result = self.conflict_resolver(path, conflict)
if result == FileConflictResolution.DEFAULT:
return default
if result == FileConflictResolution.KEEP_EXISTING:
return False
if result == FileConflictResolution.DESTROY_EXISTING:
return True
return prompt_yes_no(prompt, default=default)

View File

@ -1,111 +0,0 @@
"""
A small progress bar implementation.
"""
import sys
from dataclasses import dataclass
from types import TracebackType
from typing import Optional, Type
import requests
from rich.console import Console
from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID,
TextColumn, TimeRemainingColumn,
TransferSpeedColumn)
_progress: Progress = Progress(
TextColumn("[bold blue]{task.fields[name]}", justify="right"),
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.1f}%",
"",
DownloadColumn(),
"",
TransferSpeedColumn(),
"",
TimeRemainingColumn(),
console=Console(file=sys.stdout),
transient=True
)
def size_from_headers(response: requests.Response) -> Optional[int]:
"""
Return the size of the download based on the response headers.
Arguments:
response {requests.Response} -- the response
Returns:
Optional[int] -- the size
"""
if "Content-Length" in response.headers:
return int(response.headers["Content-Length"])
return None
@dataclass
class ProgressSettings:
"""
Settings you can pass to customize the progress bar.
"""
name: str
max_size: int
def progress_for(settings: Optional[ProgressSettings]) -> 'ProgressContextManager':
"""
Returns a context manager that displays progress
Returns:
ProgressContextManager -- the progress manager
"""
return ProgressContextManager(settings)
class ProgressContextManager:
"""
A context manager used for displaying progress.
"""
def __init__(self, settings: Optional[ProgressSettings]):
self._settings = settings
self._task_id: Optional[TaskID] = None
def __enter__(self) -> 'ProgressContextManager':
"""Context manager entry function."""
if not self._settings:
return self
_progress.start()
self._task_id = _progress.add_task(
self._settings.name,
total=self._settings.max_size,
name=self._settings.name
)
return self
# pylint: disable=useless-return
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
"""Context manager exit function. Removes the task."""
if self._task_id is None:
return None
_progress.remove_task(self._task_id)
if len(_progress.task_ids) == 0:
# We need to clean up after ourselves, as we were the last one
_progress.stop()
_progress.refresh()
return None
def advance(self, amount: float) -> None:
"""
Advances the progress bar.
"""
if self._task_id is not None:
_progress.advance(self._task_id, amount)

View File

@ -1,79 +0,0 @@
"""Helper functions and classes for temporary folders."""
import logging
import shutil
from pathlib import Path
from types import TracebackType
from typing import Optional, Type
from .location import Location
LOGGER = logging.getLogger(__name__)
class TmpDir(Location):
"""A temporary folder that can create files or nested temp folders."""
def __init__(self, path: Path):
"""Create a new temporary folder for the given path."""
super().__init__(path)
self._counter = 0
self.cleanup()
self.path.mkdir(parents=True, exist_ok=True)
def __str__(self) -> str:
"""Format the folder as a string."""
return f"Folder at {self.path}"
def __enter__(self) -> 'TmpDir':
"""Context manager entry function."""
return self
# pylint: disable=useless-return
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
"""Context manager exit function. Calls cleanup()."""
self.cleanup()
return None
def new_path(self, prefix: Optional[str] = None) -> Path:
"""
Return a unique path inside the directory. Doesn't create a file or
directory.
"""
name = f"{prefix if prefix else 'tmp'}-{self._inc_and_get_counter():03}"
LOGGER.debug("Creating temp file %s", name)
return self.resolve(Path(name))
def new_subdir(self, prefix: Optional[str] = None) -> 'TmpDir':
"""
Create a new nested temporary folder and return it.
"""
name = f"{prefix if prefix else 'tmp'}-{self._inc_and_get_counter():03}"
sub_path = self.resolve(Path(name))
sub_path.mkdir(parents=True)
LOGGER.debug("Creating temp dir %s at %s", name, sub_path)
return TmpDir(sub_path)
def cleanup(self) -> None:
"""Delete this folder and all contained files."""
LOGGER.debug("Deleting temp folder %s", self.path)
if self.path.resolve().exists():
shutil.rmtree(self.path.resolve())
def _inc_and_get_counter(self) -> int:
"""Get and increment the counter by one."""
counter = self._counter
self._counter += 1
return counter

View File

@ -1,142 +0,0 @@
"""
Transforms let the user define functions to decide where the downloaded files
should be placed locally. They let the user do more advanced things like moving
only files whose names match a regex, or renaming files from one numbering
scheme to another.
"""
import os
import re
from dataclasses import dataclass
from pathlib import PurePath
from typing import Callable, List, Optional, TypeVar
from .utils import PathLike, Regex, to_path, to_pattern
Transform = Callable[[PurePath], Optional[PurePath]]
@dataclass
class Transformable:
"""
An object that can be transformed by a Transform.
"""
path: PurePath
TF = TypeVar("TF", bound=Transformable)
def apply_transform(
transform: Transform,
transformables: List[TF],
) -> List[TF]:
"""
Apply a Transform to multiple Transformables, discarding those that were
not transformed by the Transform.
"""
result: List[TF] = []
for transformable in transformables:
new_path = transform(transformable.path)
if new_path:
transformable.path = new_path
result.append(transformable)
return result
# Transform combinators
def keep(path: PurePath) -> Optional[PurePath]:
return path
def attempt(*args: Transform) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
for transform in args:
result = transform(path)
if result:
return result
return None
return inner
def optionally(transform: Transform) -> Transform:
return attempt(transform, lambda path: path)
def do(*args: Transform) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
current = path
for transform in args:
result = transform(current)
if result:
current = result
else:
return None
return current
return inner
def predicate(pred: Callable[[PurePath], bool]) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
if pred(path):
return path
return None
return inner
def glob(pattern: str) -> Transform:
return predicate(lambda path: path.match(pattern))
def move_dir(source_dir: PathLike, target_dir: PathLike) -> Transform:
source_path = to_path(source_dir)
target_path = to_path(target_dir)
def inner(path: PurePath) -> Optional[PurePath]:
if source_path in path.parents:
return target_path / path.relative_to(source_path)
return None
return inner
def move(source: PathLike, target: PathLike) -> Transform:
source_path = to_path(source)
target_path = to_path(target)
def inner(path: PurePath) -> Optional[PurePath]:
if path == source_path:
return target_path
return None
return inner
def rename(source: str, target: str) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
if path.name == source:
return path.with_name(target)
return None
return inner
def re_move(regex: Regex, target: str) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
match = to_pattern(regex).fullmatch(str(path))
if match:
groups = [match.group(0)]
groups.extend(match.groups())
return PurePath(target.format(*groups))
return None
return inner
def re_rename(regex: Regex, target: str) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]:
match = to_pattern(regex).fullmatch(path.name)
if match:
groups = [match.group(0)]
groups.extend(match.groups())
return path.with_name(target.format(*groups))
return None
return inner
def sanitize_windows_path(path: PurePath) -> PurePath:
"""
A small function to escape characters that are forbidden in windows path names.
This method is a no-op on other operating systems.
"""
# Escape windows illegal path characters
if os.name == 'nt':
sanitized_parts = [re.sub(r'[<>:"/|?]', "_", x) for x in list(path.parts)]
return PurePath(*sanitized_parts)
return path