diff --git a/CHANGELOG.md b/CHANGELOG.md
index c6c9cb9..12cda26 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -30,6 +30,7 @@ ambiguous situations.
- Remove videos from description pages
- Perform ILIAS cycle detection after processing the transform to allow
ignoring duplicated elements
+- Parse headings (h1-h3) as folders in kit-ipd crawler
### Fixed
- Personal desktop/dashboard/favorites crawling
diff --git a/PFERD/crawl/http_crawler.py b/PFERD/crawl/http_crawler.py
index 39b22f3..fe8a360 100644
--- a/PFERD/crawl/http_crawler.py
+++ b/PFERD/crawl/http_crawler.py
@@ -8,6 +8,7 @@ from typing import Any, Dict, List, Optional, Tuple
import aiohttp
import certifi
from aiohttp.client import ClientTimeout
+from bs4 import Tag
from ..auth import Authenticator
from ..config import Config
@@ -172,6 +173,31 @@ class HttpCrawler(Crawler):
log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}")
log.warn(str(e))
+ @staticmethod
+ def get_folder_structure_from_heading_hierarchy(file_link: Tag, drop_h1: bool = False) -> PurePath:
+ """
+ Retrieves the hierarchy of headings associated with the give file link and constructs a folder
+ structure from them.
+
+
level headings usually only appear once and serve as the page title, so they would introduce
+ redundant nesting. To avoid this, headings are ignored via the drop_h1 parameter.
+ """
+
+ def find_associated_headings(tag: Tag, level: int) -> PurePath:
+ if level == 0 or (level == 1 and drop_h1):
+ return PurePath()
+
+ level_heading = tag.find_previous(name=f"h{level}")
+
+ if level_heading is None:
+ return find_associated_headings(tag, level - 1)
+
+ folder_name = level_heading.getText().strip()
+ return find_associated_headings(level_heading, level - 1) / folder_name
+
+ # start at level because paragraph-level headings are usually too granular for folder names
+ return find_associated_headings(file_link, 3)
+
def _get_previous_etag_from_report(self, path: PurePath) -> Optional[str]:
"""
If available, retrieves the entity tag for a given path which was stored in the previous report.
diff --git a/PFERD/crawl/kit_ipd_crawler.py b/PFERD/crawl/kit_ipd_crawler.py
index d9515e2..e1d13a7 100644
--- a/PFERD/crawl/kit_ipd_crawler.py
+++ b/PFERD/crawl/kit_ipd_crawler.py
@@ -3,7 +3,7 @@ import re
from dataclasses import dataclass
from datetime import datetime
from pathlib import PurePath
-from typing import Awaitable, List, Optional, Pattern, Set, Tuple, Union
+from typing import Any, Awaitable, Generator, Iterable, List, Optional, Pattern, Tuple, Union
from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag
@@ -32,24 +32,24 @@ class KitIpdCrawlerSection(HttpCrawlerSection):
return re.compile(regex)
-@dataclass(unsafe_hash=True)
+@dataclass
class KitIpdFile:
name: str
url: str
+ def explain(self) -> None:
+ log.explain(f"File {self.name!r} (href={self.url!r})")
+
@dataclass
class KitIpdFolder:
name: str
- files: List[KitIpdFile]
+ entries: List[Union[KitIpdFile, "KitIpdFolder"]]
def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}")
- for file in self.files:
- log.explain(f"File {file.name!r} (href={file.url!r})")
-
- def __hash__(self) -> int:
- return self.name.__hash__()
+ for entry in self.entries:
+ entry.explain()
class KitIpdCrawler(HttpCrawler):
@@ -73,28 +73,33 @@ class KitIpdCrawler(HttpCrawler):
async with maybe_cl:
for item in await self._fetch_items():
+ item.explain()
if isinstance(item, KitIpdFolder):
- tasks.append(self._crawl_folder(item))
+ tasks.append(self._crawl_folder(PurePath("."), item))
else:
+ log.explain_topic(f"Orphan file {item.name!r} (href={item.url!r})")
+ log.explain("Attributing it to root folder")
# do this here to at least be sequential and not parallel (rate limiting is hard, as the
# crawl abstraction does not hold for these requests)
etag, mtime = await self._request_resource_version(item.url)
- # Orphan files are placed in the root folder
tasks.append(self._download_file(PurePath("."), item, etag, mtime))
await self.gather(tasks)
- async def _crawl_folder(self, folder: KitIpdFolder) -> None:
- path = PurePath(folder.name)
+ async def _crawl_folder(self, parent: PurePath, folder: KitIpdFolder) -> None:
+ path = parent / folder.name
if not await self.crawl(path):
return
tasks = []
- for file in folder.files:
- # do this here to at least be sequential and not parallel (rate limiting is hard, as the crawl
- # abstraction does not hold for these requests)
- etag, mtime = await self._request_resource_version(file.url)
- tasks.append(self._download_file(path, file, etag, mtime))
+ for entry in folder.entries:
+ if isinstance(entry, KitIpdFolder):
+ tasks.append(self._crawl_folder(path, entry))
+ else:
+ # do this here to at least be sequential and not parallel (rate limiting is hard, as the crawl
+ # abstraction does not hold for these requests)
+ etag, mtime = await self._request_resource_version(entry.url)
+ tasks.append(self._download_file(path, entry, etag, mtime))
await self.gather(tasks)
@@ -120,42 +125,31 @@ class KitIpdCrawler(HttpCrawler):
async with maybe_dl as (bar, sink):
await self._stream_from_url(file.url, element_path, sink, bar)
- async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]:
+ async def _fetch_items(self) -> Iterable[Union[KitIpdFile, KitIpdFolder]]:
page, url = await self.get_page()
elements: List[Tag] = self._find_file_links(page)
- items: Set[Union[KitIpdFile, KitIpdFolder]] = set()
+ # do not add unnecessary nesting for a single heading
+ drop_h1: bool = len(page.find_all(name="h1")) <= 1
+
+ folder_tree: KitIpdFolder = KitIpdFolder(".", [])
for element in elements:
- folder_label = self._find_folder_label(element)
- if folder_label:
- folder = self._extract_folder(folder_label, url)
- if folder not in items:
- items.add(folder)
- folder.explain()
- else:
- file = self._extract_file(element, url)
- items.add(file)
- log.explain_topic(f"Orphan file {file.name!r} (href={file.url!r})")
- log.explain("Attributing it to root folder")
+ parent = HttpCrawler.get_folder_structure_from_heading_hierarchy(element, drop_h1)
+ file = self._extract_file(element, url)
- return items
+ current_folder: KitIpdFolder = folder_tree
+ for folder_name in parent.parts:
+ # helps the type checker to verify that current_folder is indeed a folder
+ def subfolders() -> Generator[KitIpdFolder, Any, None]:
+ return (entry for entry in current_folder.entries if isinstance(entry, KitIpdFolder))
- def _extract_folder(self, folder_tag: Tag, url: str) -> KitIpdFolder:
- files: List[KitIpdFile] = []
- name = folder_tag.getText().strip()
+ if not any(entry.name == folder_name for entry in subfolders()):
+ current_folder.entries.append(KitIpdFolder(folder_name, []))
+ current_folder = next(entry for entry in subfolders() if entry.name == folder_name)
- container: Tag = folder_tag.findNextSibling(name="table")
- for link in self._find_file_links(container):
- files.append(self._extract_file(link, url))
+ current_folder.entries.append(file)
- return KitIpdFolder(name, files)
-
- @staticmethod
- def _find_folder_label(file_link: Tag) -> Optional[Tag]:
- enclosing_table: Tag = file_link.findParent(name="table")
- if enclosing_table is None:
- return None
- return enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$"))
+ return folder_tree.entries
def _extract_file(self, link: Tag, url: str) -> KitIpdFile:
url = self._abs_url_from_link(url, link)