diff --git a/PFERD/cli/command_kit_ipd.py b/PFERD/cli/command_kit_ipd.py index 480cc9b..c4c593f 100644 --- a/PFERD/cli/command_kit_ipd.py +++ b/PFERD/cli/command_kit_ipd.py @@ -35,7 +35,7 @@ def load( log.explain("Creating config for command 'kit-ipd'") parser["crawl:kit-ipd"] = {} - section = parser["crawl:ipd"] + section = parser["crawl:kit-ipd"] load_crawler(args, section) section["type"] = "kit-ipd" diff --git a/PFERD/crawl/kit_ipd_crawler.py b/PFERD/crawl/kit_ipd_crawler.py index 1ed5ffe..76145b4 100644 --- a/PFERD/crawl/kit_ipd_crawler.py +++ b/PFERD/crawl/kit_ipd_crawler.py @@ -3,7 +3,7 @@ import re from dataclasses import dataclass from pathlib import PurePath from re import Pattern -from typing import List, Set, Union, AnyStr, Optional +from typing import Awaitable, List, Optional, Set, Union from urllib.parse import urljoin from bs4 import BeautifulSoup, Tag @@ -27,12 +27,12 @@ class KitIpdCrawlerSection(HttpCrawlerSection): return target - def link_regex(self) -> Pattern[AnyStr]: - regex = self.s.get("link_regex", "^.*/[^/]*\.(?:pdf|zip|c|java)$") + def link_regex(self) -> Pattern[str]: + regex = self.s.get("link_regex", r"^.*/[^/]*\.(?:pdf|zip|c|java)$") return re.compile(regex) -@dataclass +@dataclass(unsafe_hash=True) class KitIpdFile: name: str url: str @@ -43,6 +43,14 @@ class KitIpdFolder: name: str files: List[KitIpdFile] + def explain(self) -> None: + log.explain_topic(f"Folder {self.name!r}") + for file in self.files: + log.explain(f"File {file.name!r}") + + def __hash__(self) -> int: + return self.name.__hash__() + class KitIpdCrawler(HttpCrawler): @@ -61,13 +69,15 @@ class KitIpdCrawler(HttpCrawler): if not maybe_cl: return - folders: List[KitIpdFolder] = [] + tasks: List[Awaitable[None]] = [] async with maybe_cl: - folder_tags = await self._fetch_folder_tags() - folders = [self._extract_folder(tag) for tag in folder_tags] - - tasks = [self._crawl_folder(folder) for folder in folders] + for item in await self._fetch_items(): + if isinstance(item, KitIpdFolder): + tasks.append(self._crawl_folder(item)) + else: + # Orphan files are placed in the root folder + tasks.append(self._download_file(PurePath("."), item)) await self.gather(tasks) @@ -89,51 +99,42 @@ class KitIpdCrawler(HttpCrawler): async with maybe_dl as (bar, sink): await self._stream_from_url(file.url, sink, bar) - async def _fetch_folder_tags(self) -> Set[Tag]: + async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]: page = await self.get_page() elements: List[Tag] = self._find_file_links(page) - folder_tags: Set[Tag] = set() + items: Set[Union[KitIpdFile, KitIpdFolder]] = set() for element in elements: - folder_label = self._fetch_folder_label(element) - if folder_label is None: - folder_tags.add(page) + folder_label = self._find_folder_label(element) + if folder_label: + folder = self._extract_folder(folder_label) + if folder not in items: + items.add(folder) + folder.explain() else: - folder_tags.add(folder_label) + file = self._extract_file(element) + items.add(file) + log.explain_topic(f"Orphan file {file.name!r}") + log.explain("Attributing it to root folder") - return folder_tags + return items def _extract_folder(self, folder_tag: Tag) -> KitIpdFolder: files: List[KitIpdFile] = [] - # if files have found outside a regular table - if not folder_tag.name.startswith("h"): - name = "." - root_links = filter(lambda f: self._fetch_folder_label(f) is None, self._find_file_links(folder_tag)) - for link in root_links: - files.append(self._extract_file(link)) + name = folder_tag.getText().strip() - else: - name = folder_tag.getText().strip() - container: Tag = folder_tag.findNextSibling(name="table") - for link in self._find_file_links(container): - files.append(self._extract_file(link)) - - log.explain_topic(f"Found folder {name!r}") - for file in files: - log.explain(f"Found file {file.name!r}") + container: Tag = folder_tag.findNextSibling(name="table") + for link in self._find_file_links(container): + files.append(self._extract_file(link)) return KitIpdFolder(name, files) @staticmethod - def _fetch_folder_label(file_link: Tag) -> Optional[Tag]: + def _find_folder_label(file_link: Tag) -> Optional[Tag]: enclosing_table: Tag = file_link.findParent(name="table") if enclosing_table is None: return None - label: Tag = enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$")) - if label is None: - return None - else: - return label + return enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$")) def _extract_file(self, link: Tag) -> KitIpdFile: url = self._abs_url_from_link(link)