mirror of
https://github.com/Garmelon/PFERD.git
synced 2023-12-21 10:23:01 +01:00
151 lines
4.4 KiB
Python
151 lines
4.4 KiB
Python
"""
|
|
Utility functions and a scraper/downloader for the IPD pages.
|
|
"""
|
|
import datetime
|
|
import logging
|
|
import math
|
|
import os
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Callable, List, Optional
|
|
from urllib.parse import urljoin
|
|
|
|
import bs4
|
|
import requests
|
|
|
|
from PFERD.errors import FatalException
|
|
from PFERD.utils import soupify
|
|
|
|
from .logging import PrettyLogger
|
|
from .organizer import Organizer
|
|
from .tmp_dir import TmpDir
|
|
from .transform import Transformable
|
|
from .utils import stream_to_path
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
PRETTY = PrettyLogger(LOGGER)
|
|
|
|
|
|
@dataclass
|
|
class IpdDownloadInfo(Transformable):
|
|
"""
|
|
Information about an ipd entry.
|
|
"""
|
|
url: str
|
|
modification_date: Optional[datetime.datetime]
|
|
|
|
|
|
IpdDownloadStrategy = Callable[[Organizer, IpdDownloadInfo], bool]
|
|
|
|
|
|
def ipd_download_new_or_modified(organizer: Organizer, info: IpdDownloadInfo) -> bool:
|
|
"""
|
|
Accepts new files or files with a more recent modification date.
|
|
"""
|
|
resolved_file = organizer.resolve(info.path)
|
|
if not resolved_file.exists():
|
|
return True
|
|
if not info.modification_date:
|
|
PRETTY.ignored_file(info.path, "could not find modification time, file exists")
|
|
return False
|
|
|
|
resolved_mod_time_seconds = resolved_file.stat().st_mtime
|
|
|
|
# Download if the info is newer
|
|
if info.modification_date.timestamp() > resolved_mod_time_seconds:
|
|
return True
|
|
|
|
PRETTY.ignored_file(info.path, "local file has newer or equal modification time")
|
|
return False
|
|
|
|
|
|
class IpdCrawler:
|
|
# pylint: disable=too-few-public-methods
|
|
"""
|
|
A crawler for IPD pages.
|
|
"""
|
|
|
|
def __init__(self, base_url: str):
|
|
self._base_url = base_url
|
|
|
|
def _abs_url_from_link(self, link_tag: bs4.Tag) -> str:
|
|
"""
|
|
Create an absolute url from an <a> tag.
|
|
"""
|
|
return urljoin(self._base_url, link_tag.get("href"))
|
|
|
|
def crawl(self) -> List[IpdDownloadInfo]:
|
|
"""
|
|
Crawls the playlist given in the constructor.
|
|
"""
|
|
page = soupify(requests.get(self._base_url))
|
|
|
|
items: List[IpdDownloadInfo] = []
|
|
|
|
for link in page.findAll(name="a", attrs={"href": lambda x: x and x.endswith("pdf")}):
|
|
href: str = link.attrs.get("href")
|
|
name = href.split("/")[-1]
|
|
|
|
modification_date: Optional[datetime.datetime]
|
|
try:
|
|
enclosing_row: bs4.Tag = link.findParent(name="tr")
|
|
date_text = enclosing_row.find(name="td").text
|
|
modification_date = datetime.datetime.strptime(date_text, "%d.%m.%Y")
|
|
except ValueError:
|
|
modification_date = None
|
|
|
|
items.append(IpdDownloadInfo(
|
|
Path(name),
|
|
url=self._abs_url_from_link(link),
|
|
modification_date=modification_date
|
|
))
|
|
|
|
return items
|
|
|
|
|
|
class IpdDownloader:
|
|
"""
|
|
A downloader for ipd files.
|
|
"""
|
|
|
|
def __init__(self, tmp_dir: TmpDir, organizer: Organizer, strategy: IpdDownloadStrategy):
|
|
self._tmp_dir = tmp_dir
|
|
self._organizer = organizer
|
|
self._strategy = strategy
|
|
self._session = requests.session()
|
|
|
|
def download_all(self, infos: List[IpdDownloadInfo]) -> None:
|
|
"""
|
|
Download multiple files one after the other.
|
|
"""
|
|
for info in infos:
|
|
self.download(info)
|
|
|
|
def download(self, info: IpdDownloadInfo) -> None:
|
|
"""
|
|
Download a single file.
|
|
"""
|
|
if not self._strategy(self._organizer, info):
|
|
self._organizer.mark(info.path)
|
|
return
|
|
|
|
with self._session.get(info.url, stream=True) as response:
|
|
if response.status_code == 200:
|
|
tmp_file = self._tmp_dir.new_path()
|
|
stream_to_path(response, tmp_file, info.path.name)
|
|
dst_path = self._organizer.accept_file(tmp_file, info.path)
|
|
|
|
if dst_path and info.modification_date:
|
|
os.utime(
|
|
dst_path,
|
|
times=(
|
|
math.ceil(info.modification_date.timestamp()),
|
|
math.ceil(info.modification_date.timestamp())
|
|
)
|
|
)
|
|
|
|
elif response.status_code == 403:
|
|
raise FatalException("Received 403. Are you not using the KIT VPN?")
|
|
else:
|
|
PRETTY.warning(f"Could not download file, got response {response.status_code}")
|