diff --git a/PFERD/ipd.py b/PFERD/ipd.py new file mode 100644 index 0000000..33aaff1 --- /dev/null +++ b/PFERD/ipd.py @@ -0,0 +1,150 @@ +""" +Utility functions and a scraper/downloader for the IPD pages. +""" +import datetime +import logging +import math +import os +from dataclasses import dataclass +from pathlib import Path +from typing import Callable, List, Optional +from urllib.parse import urljoin + +import bs4 +import requests + +from PFERD.errors import FatalException +from PFERD.utils import soupify + +from .logging import PrettyLogger +from .organizer import Organizer +from .tmp_dir import TmpDir +from .transform import Transformable +from .utils import stream_to_path + +LOGGER = logging.getLogger(__name__) +PRETTY = PrettyLogger(LOGGER) + + +@dataclass +class IpdDownloadInfo(Transformable): + """ + Information about an ipd entry. + """ + url: str + modification_date: Optional[datetime.datetime] + + +IpdDownloadStrategy = Callable[[Organizer, IpdDownloadInfo], bool] + + +def ipd_download_new_or_modified(organizer: Organizer, info: IpdDownloadInfo) -> bool: + """ + Accepts new files or files with a more recent modification date. + """ + resolved_file = organizer.resolve(info.path) + if not resolved_file.exists(): + return True + if not info.modification_date: + PRETTY.ignored_file(info.path, "could not find modification time, file exists") + return False + + resolved_mod_time_seconds = resolved_file.stat().st_mtime + + # Download if the info is newer + if info.modification_date.timestamp() > resolved_mod_time_seconds: + return True + + PRETTY.ignored_file(info.path, "local file has newer or equal modification time") + return False + + +class IpdCrawler: + # pylint: disable=too-few-public-methods + """ + A crawler for IPD pages. + """ + + def __init__(self, base_url: str): + self._base_url = base_url + + def _abs_url_from_link(self, link_tag: bs4.Tag) -> str: + """ + Create an absolute url from an tag. + """ + return urljoin(self._base_url, link_tag.get("href")) + + def crawl(self) -> List[IpdDownloadInfo]: + """ + Crawls the playlist given in the constructor. + """ + page = soupify(requests.get(self._base_url)) + + items: List[IpdDownloadInfo] = [] + + for link in page.findAll(name="a", attrs={"href": lambda x: x and x.endswith("pdf")}): + href: str = link.attrs.get("href") + name = href.split("/")[-1] + + modification_date: Optional[datetime.datetime] + try: + enclosing_row: bs4.Tag = link.findParent(name="tr") + date_text = enclosing_row.find(name="td").text + modification_date = datetime.datetime.strptime(date_text, "%d.%m.%Y") + except ValueError: + modification_date = None + + items.append(IpdDownloadInfo( + Path(name), + url=self._abs_url_from_link(link), + modification_date=modification_date + )) + + return items + + +class IpdDownloader: + """ + A downloader for ipd files. + """ + + def __init__(self, tmp_dir: TmpDir, organizer: Organizer, strategy: IpdDownloadStrategy): + self._tmp_dir = tmp_dir + self._organizer = organizer + self._strategy = strategy + self._session = requests.session() + + def download_all(self, infos: List[IpdDownloadInfo]) -> None: + """ + Download multiple files one after the other. + """ + for info in infos: + self.download(info) + + def download(self, info: IpdDownloadInfo) -> None: + """ + Download a single file. + """ + if not self._strategy(self._organizer, info): + self._organizer.mark(info.path) + return + + with self._session.get(info.url, stream=True) as response: + if response.status_code == 200: + tmp_file = self._tmp_dir.new_path() + stream_to_path(response, tmp_file, info.path.name) + dst_path = self._organizer.accept_file(tmp_file, info.path) + + if dst_path and info.modification_date: + os.utime( + dst_path, + times=( + math.ceil(info.modification_date.timestamp()), + math.ceil(info.modification_date.timestamp()) + ) + ) + + elif response.status_code == 403: + raise FatalException("Received 403. Are you not using the KIT VPN?") + else: + PRETTY.warning(f"Could not download file, got response {response.status_code}") diff --git a/PFERD/pferd.py b/PFERD/pferd.py index 042dd93..f57f078 100644 --- a/PFERD/pferd.py +++ b/PFERD/pferd.py @@ -14,6 +14,8 @@ from .errors import FatalException, swallow_and_print_errors from .ilias import (IliasAuthenticator, IliasCrawler, IliasDirectoryFilter, IliasDownloader, IliasDownloadInfo, IliasDownloadStrategy, KitShibbolethAuthenticator, download_modified_or_new) +from .ipd import (IpdCrawler, IpdDownloader, IpdDownloadInfo, + IpdDownloadStrategy, ipd_download_new_or_modified) from .location import Location from .logging import PrettyLogger, enable_logging from .organizer import Organizer @@ -294,6 +296,55 @@ class Pferd(Location): return organizer + @swallow_and_print_errors + def ipd_kit( + self, + target: Union[PathLike, Organizer], + url: str, + transform: Transform = lambda x: x, + download_strategy: IpdDownloadStrategy = ipd_download_new_or_modified, + clean: bool = True + ) -> Organizer: + """ + Synchronizes a folder with a DIVA playlist. + + Arguments: + target {Union[PathLike, Organizer]} -- The organizer / target folder to use. + url {str} -- the url to the page + + Keyword Arguments: + transform {Transform} -- A transformation function for the output paths. Return None + to ignore a file. (default: {lambdax:x}) + download_strategy {DivaDownloadStrategy} -- A function to determine which files need to + be downloaded. Can save bandwidth and reduce the number of requests. + (default: {diva_download_new}) + clean {bool} -- Whether to clean up when the method finishes. + """ + tmp_dir = self._tmp_dir.new_subdir() + + if target is None: + PRETTY.starting_synchronizer("None", "IPD", url) + raise FatalException("Got 'None' as target directory, aborting") + + if isinstance(target, Organizer): + organizer = target + else: + organizer = Organizer(self.resolve(to_path(target))) + + PRETTY.starting_synchronizer(organizer.path, "IPD", url) + + elements: List[IpdDownloadInfo] = IpdCrawler(url).crawl() + transformed = apply_transform(transform, elements) + + if self._test_run: + self._print_transformables(transformed) + return organizer + + downloader = IpdDownloader(tmp_dir=tmp_dir, organizer=organizer, strategy=download_strategy) + downloader.download_all(transformed) + + return organizer + @swallow_and_print_errors def diva_kit( self,