pferd/PFERD/crawl/kit_ipd_crawler.py

139 lines
4.2 KiB
Python
Raw Normal View History

2021-10-21 12:01:41 +02:00
import os
from dataclasses import dataclass
from pathlib import PurePath
from typing import List, Set, Union
from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag
from ..config import Config
from ..logging import ProgressBar, log
from ..output_dir import FileSink
from ..utils import soupify
from .crawler import CrawlError
from .http_crawler import HttpCrawler, HttpCrawlerSection
class KitIpdCrawlerSection(HttpCrawlerSection):
def target(self) -> str:
target = self.s.get("target")
if not target:
self.missing_value("target")
if not target.startswith("https://"):
self.invalid_value("target", target, "Should be a URL")
return target
@dataclass
class KitIpdFile:
name: str
url: str
@dataclass
class KitIpdFolder:
name: str
files: List[KitIpdFile]
class KitIpdCrawler(HttpCrawler):
def __init__(
self,
name: str,
section: KitIpdCrawlerSection,
config: Config,
):
super().__init__(name, section, config)
self._url = section.target()
async def _run(self) -> None:
maybe_cl = await self.crawl(PurePath("."))
if not maybe_cl:
return
folders: List[KitIpdFolder] = []
async with maybe_cl:
folder_tags = await self._fetch_folder_tags()
folders = [self._extract_folder(tag) for tag in folder_tags]
tasks = [self._crawl_folder(folder) for folder in folders]
await self.gather(tasks)
async def _crawl_folder(self, folder: KitIpdFolder) -> None:
path = PurePath(folder.name)
if not await self.crawl(path):
return
tasks = [self._download_file(path, file) for file in folder.files]
await self.gather(tasks)
async def _download_file(self, parent: PurePath, file: KitIpdFile) -> None:
element_path = parent / file.name
maybe_dl = await self.download(element_path)
if not maybe_dl:
return
async with maybe_dl as (bar, sink):
await self._stream_from_url(file.url, sink, bar)
async def _fetch_folder_tags(self) -> Set[Tag]:
page = await self.get_page()
elements: List[Tag] = self._find_file_links(page)
folder_tags: Set[Tag] = set()
for element in elements:
enclosing_data: Tag = element.findParent(name="td")
label: Tag = enclosing_data.findPreviousSibling(name="td")
folder_tags.add(label)
return folder_tags
def _extract_folder(self, folder_tag: Tag) -> KitIpdFolder:
name = folder_tag.getText().strip()
files: List[KitIpdFile] = []
container: Tag = folder_tag.findNextSibling(name="td")
for link in self._find_file_links(container):
files.append(self._extract_file(link))
log.explain_topic(f"Found folder {name!r}")
for file in files:
log.explain(f"Found file {file.name!r}")
return KitIpdFolder(name, files)
def _extract_file(self, link: Tag) -> KitIpdFile:
name = link.getText().strip()
url = self._abs_url_from_link(link)
_, extension = os.path.splitext(url)
return KitIpdFile(name + extension, url)
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> List[Tag]:
return tag.findAll(name="a", attrs={"href": lambda x: x and "intern" in x})
def _abs_url_from_link(self, link_tag: Tag) -> str:
return urljoin(self._url, link_tag.get("href"))
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp:
if resp.status == 403:
raise CrawlError("Received a 403. Are you within the KIT network/VPN?")
if resp.content_length:
bar.set_total(resp.content_length)
async for data in resp.content.iter_chunked(1024):
sink.file.write(data)
bar.advance(len(data))
sink.done()
async def get_page(self) -> BeautifulSoup:
async with self.session.get(self._url) as request:
return soupify(await request.read())