pferd/PFERD/ipd.py

152 lines
4.5 KiB
Python
Raw Normal View History

2020-11-03 20:40:09 +01:00
"""
Utility functions and a scraper/downloader for the IPD pages.
"""
import datetime
import logging
import math
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, List, Optional
from urllib.parse import urljoin
import bs4
import requests
from PFERD.errors import FatalException
from PFERD.utils import soupify
from .logging import PrettyLogger
from .organizer import Organizer
from .tmp_dir import TmpDir
from .transform import Transformable
from .utils import stream_to_path
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
@dataclass
class IpdDownloadInfo(Transformable):
"""
Information about an ipd entry.
"""
url: str
modification_date: Optional[datetime.datetime]
IpdDownloadStrategy = Callable[[Organizer, IpdDownloadInfo], bool]
def ipd_download_new_or_modified(organizer: Organizer, info: IpdDownloadInfo) -> bool:
"""
Accepts new files or files with a more recent modification date.
"""
resolved_file = organizer.resolve(info.path)
if not resolved_file.exists():
return True
if not info.modification_date:
PRETTY.ignored_file(info.path, "could not find modification time, file exists")
return False
resolved_mod_time_seconds = resolved_file.stat().st_mtime
# Download if the info is newer
if info.modification_date.timestamp() > resolved_mod_time_seconds:
return True
PRETTY.ignored_file(info.path, "local file has newer or equal modification time")
return False
class IpdCrawler:
# pylint: disable=too-few-public-methods
"""
A crawler for IPD pages.
"""
def __init__(self, base_url: str):
self._base_url = base_url
def _abs_url_from_link(self, link_tag: bs4.Tag) -> str:
"""
Create an absolute url from an <a> tag.
"""
return urljoin(self._base_url, link_tag.get("href"))
def crawl(self) -> List[IpdDownloadInfo]:
"""
Crawls the playlist given in the constructor.
"""
page = soupify(requests.get(self._base_url))
items: List[IpdDownloadInfo] = []
for link in page.findAll(name="a", attrs={"href": lambda x: x and x.endswith("pdf")}):
href: str = link.attrs.get("href")
name = href.split("/")[-1]
2020-11-04 14:38:15 +01:00
modification_date: Optional[datetime.datetime] = None
2020-11-03 20:40:09 +01:00
try:
enclosing_row: bs4.Tag = link.findParent(name="tr")
2020-11-04 14:38:15 +01:00
if enclosing_row:
date_text = enclosing_row.find(name="td").text
modification_date = datetime.datetime.strptime(date_text, "%d.%m.%Y")
2020-11-03 20:40:09 +01:00
except ValueError:
modification_date = None
items.append(IpdDownloadInfo(
Path(name),
url=self._abs_url_from_link(link),
modification_date=modification_date
))
return items
class IpdDownloader:
"""
A downloader for ipd files.
"""
def __init__(self, tmp_dir: TmpDir, organizer: Organizer, strategy: IpdDownloadStrategy):
self._tmp_dir = tmp_dir
self._organizer = organizer
self._strategy = strategy
self._session = requests.session()
def download_all(self, infos: List[IpdDownloadInfo]) -> None:
"""
Download multiple files one after the other.
"""
for info in infos:
self.download(info)
def download(self, info: IpdDownloadInfo) -> None:
"""
Download a single file.
"""
if not self._strategy(self._organizer, info):
self._organizer.mark(info.path)
return
with self._session.get(info.url, stream=True) as response:
if response.status_code == 200:
tmp_file = self._tmp_dir.new_path()
stream_to_path(response, tmp_file, info.path.name)
dst_path = self._organizer.accept_file(tmp_file, info.path)
if dst_path and info.modification_date:
os.utime(
dst_path,
times=(
math.ceil(info.modification_date.timestamp()),
math.ceil(info.modification_date.timestamp())
)
)
elif response.status_code == 403:
raise FatalException("Received 403. Are you not using the KIT VPN?")
else:
PRETTY.warning(f"Could not download file, got response {response.status_code}")