Compare commits

...

32 Commits

Author SHA1 Message Date
0045124a4e Bump version to 3.3.0 2022-01-09 21:09:09 +01:00
9618aae83b Add content pages to changelog 2022-01-09 18:32:58 +01:00
33453ede2d Update dependency versions in setup.py 2022-01-09 18:31:42 +01:00
e467b38d73 Only reject 1970 timestamps on windows 2022-01-09 18:23:00 +01:00
e9d2d05030 Update changelog 2022-01-09 11:48:26 +01:00
4bf0c972e6 Update types for rich 11 2022-01-09 11:48:26 +01:00
4ee919625d Add rudimentary support for content pages 2022-01-08 20:47:35 +01:00
d30f25ee97 Detect shib login page as login page
And do not assume we are logged in...
2022-01-08 20:28:45 +01:00
10d9d74528 Bail out when crawling recursive courses 2022-01-08 20:28:30 +01:00
43c5453e10 Correctly crawl files on desktop
The files on the desktop do not include a download link, so we need to
rewrite it.
2022-01-08 20:00:53 +01:00
eb4de8ae0c Ignore 1970 dates as windows crashes when calling .timestamp() 2022-01-08 18:14:43 +01:00
e32c1f000f Fix mtime for single streams 2022-01-08 18:05:48 +01:00
5f527bc697 Remove Python 3.9 Pattern typehints 2022-01-08 17:14:40 +01:00
ced8b9a2d0 Fix some accordions 2022-01-08 16:58:30 +01:00
6f3cfd4396 Fix personal desktop crawling 2022-01-08 16:58:15 +01:00
462d993fbc Fix local video path cache (hopefully) 2022-01-08 00:27:48 +01:00
a99356f2a2 Fix video stream extraction 2022-01-08 00:27:34 +01:00
eac2e34161 Fix is_logged_in for ILIAS 7 2022-01-07 23:32:31 +01:00
a82a0b19c2 Collect crawler warnings/errors and include them in the report 2021-11-07 21:48:55 +01:00
90cb6e989b Do not download single videos if cache does not exist 2021-11-06 23:21:15 +01:00
6289938d7c Do not stop crawling files when encountering a CrawlWarning 2021-11-06 12:09:51 +01:00
13b8c3d9c6 Add regex option to config and CLI parser 2021-11-02 09:30:46 +01:00
88afe64a92 Refactor IPD crawler a bit 2021-11-02 01:25:01 +00:00
6b2a657573 Fix IPD crawler for different subpages (#42)
This patch reworks the IPD crawler to support subpages which do not use
"/intern" for links and fetches the folder names from table headings.
2021-11-02 01:25:01 +00:00
d6f38a61e1 Fixed minor spelling mistakes 2021-11-02 01:54:00 +01:00
ad3f4955f7 Update changelog 2021-10-30 18:14:39 +02:00
e42ab83d32 Add support for ILIAS cards 2021-10-30 18:13:44 +02:00
f9a3f9b9f2 Handle multi-stream videos 2021-10-30 18:12:29 +02:00
ef7d5ea2d3 Allow storing crawler-specific data in reports 2021-10-30 18:09:05 +02:00
55ea304ff3 Disable interpolation of ConfigParser 2021-10-25 23:37:42 +02:00
fee12b3d9e Fix changelog 2021-10-25 17:44:12 +00:00
6673077397 Add kit-ipd crawler 2021-10-21 13:20:21 +02:00
17 changed files with 662 additions and 50 deletions

View File

@ -22,6 +22,23 @@ ambiguous situations.
## Unreleased
## 3.3.0 - 2022-01-09
### Added
- A KIT IPD crawler
- Support for ILIAS cards
- (Rudimentary) support for content pages
- Support for multi-stream videos
- Support for ILIAS 7
### Removed
- [Interpolation](https://docs.python.org/3/library/configparser.html#interpolation-of-values) in config file
### Fixed
- Crawling of recursive courses
- Crawling files directly placed on the personal desktop
- Ignore timestamps at the unix epoch as they crash on windows
## 3.2.0 - 2021-08-04
### Added

View File

@ -4,11 +4,11 @@ A config file consists of sections. A section begins with a `[section]` header,
which is followed by a list of `key = value` pairs. Comments must be on their
own line and start with `#`. Multiline values must be indented beyond their key.
Boolean values can be `yes` or `no`. For more details and some examples on the
format, see the [configparser documentation][1] ([basic interpolation][2] is
enabled).
format, see the [configparser documentation][1] ([interpolation][2] is
disabled).
[1]: <https://docs.python.org/3/library/configparser.html#supported-ini-file-structure> "Supported INI File Structure"
[2]: <https://docs.python.org/3/library/configparser.html#configparser.BasicInterpolation> "BasicInterpolation"
[2]: <https://docs.python.org/3/library/configparser.html#interpolation-of-values> "Interpolation of values"
## The `DEFAULT` section
@ -53,7 +53,7 @@ common to all crawlers:
crawler can still be executed manually using the `--crawler` or `-C` flags.
(Default: `no`)
- `output_dir`: The directory the crawler synchronizes files to. A crawler will
never place any files outside of this directory. (Default: the crawler's name)
never place any files outside this directory. (Default: the crawler's name)
- `redownload`: When to download a file that is already present locally.
(Default: `never-smart`)
- `never`: If a file is present locally, it is not downloaded again.
@ -136,6 +136,18 @@ crawler simulate a slower, network-based crawler.
requests. (Default: `0.0`)
- `download_speed`: Download speed (in bytes per second) to simulate. (Optional)
### The `kit-ipd` crawler
This crawler crawls a KIT-IPD page by url. The root page can be crawled from
outside the KIT network so you will be informed about any new/deleted files,
but downloading files requires you to be within. Adding a show delay between
requests is likely a good idea.
- `target`: URL to a KIT-IPD page
- `link_regex`: A regex that is matched against the `href` part of links. If it
matches, the given link is downloaded as a file. This is used to extract
files from KIT-IPD pages. (Default: `^.*/[^/]*\.(?:pdf|zip|c|java)$`)
### The `kit-ilias-web` crawler
This crawler crawls the KIT ILIAS instance.

View File

@ -1,4 +1,5 @@
Copyright 2019-2020 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, TheChristophe, Scriptim
Copyright 2019-2021 Garmelon, I-Al-Istannen, danstooamerican, pavelzw,
TheChristophe, Scriptim, thelukasprobst, Toorero
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in

View File

@ -15,7 +15,7 @@ from .transformer import RuleParseError
def load_config_parser(args: argparse.Namespace) -> configparser.ConfigParser:
log.explain_topic("Loading config")
parser = configparser.ConfigParser()
parser = configparser.ConfigParser(interpolation=None)
if args.command is None:
log.explain("No CLI command specified, loading config from file")

View File

@ -9,4 +9,5 @@
from . import command_local # noqa: F401 imported but unused
from . import command_kit_ilias_web # noqa: F401 imported but unused
from . import command_kit_ipd # noqa: F401 imported but unused
from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused

View File

@ -0,0 +1,54 @@
import argparse
import configparser
from pathlib import Path
from ..logging import log
from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler
SUBPARSER = SUBPARSERS.add_parser(
"kit-ipd",
parents=[CRAWLER_PARSER],
)
GROUP = SUBPARSER.add_argument_group(
title="kit ipd crawler arguments",
description="arguments for the 'kit-ipd' crawler",
)
GROUP.add_argument(
"--link-regex",
type=str,
metavar="REGEX",
help="href-matching regex to identify downloadable files"
)
GROUP.add_argument(
"target",
type=str,
metavar="TARGET",
help="url to crawl"
)
GROUP.add_argument(
"output",
type=Path,
metavar="OUTPUT",
help="output directory"
)
def load(
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
log.explain("Creating config for command 'kit-ipd'")
parser["crawl:kit-ipd"] = {}
section = parser["crawl:kit-ipd"]
load_crawler(args, section)
section["type"] = "kit-ipd"
section["target"] = str(args.target)
section["output_dir"] = str(args.output)
if args.link_regex:
section["link_regex"] = str(args.link_regex)
SUBPARSER.set_defaults(command=load)

View File

@ -5,6 +5,7 @@ from ..auth import Authenticator
from ..config import Config
from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401
from .ilias import KitIliasWebCrawler, KitIliasWebCrawlerSection
from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection
from .local_crawler import LocalCrawler, LocalCrawlerSection
CrawlerConstructor = Callable[[
@ -19,4 +20,6 @@ CRAWLERS: Dict[str, CrawlerConstructor] = {
LocalCrawler(n, LocalCrawlerSection(s), c),
"kit-ilias-web": lambda n, s, c, a:
KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a),
"kit-ipd": lambda n, s, c, a:
KitIpdCrawler(n, KitIpdCrawlerSection(s), c),
}

View File

@ -47,10 +47,12 @@ def noncritical(f: Wrapped) -> Wrapped:
try:
f(*args, **kwargs)
except (CrawlWarning, OutputDirError, MarkDuplicateError, MarkConflictError) as e:
crawler.report.add_warning(str(e))
log.warn(str(e))
crawler.error_free = False
except: # noqa: E722 do not use bare 'except'
except Exception as e:
crawler.error_free = False
crawler.report.add_error(str(e))
raise
return wrapper # type: ignore
@ -83,8 +85,10 @@ def anoncritical(f: AWrapped) -> AWrapped:
except (CrawlWarning, OutputDirError, MarkDuplicateError, MarkConflictError) as e:
log.warn(str(e))
crawler.error_free = False
except: # noqa: E722 do not use bare 'except'
crawler.report.add_warning(str(e))
except Exception as e:
crawler.error_free = False
crawler.report.add_error(str(e))
raise
return None

View File

@ -39,7 +39,12 @@ class IliasPageElement:
description: Optional[str] = None
def id(self) -> str:
regexes = [r"eid=(?P<id>[0-9a-z\-]+)", r"file_(?P<id>\d+)", r"ref_id=(?P<id>\d+)"]
regexes = [
r"eid=(?P<id>[0-9a-z\-]+)",
r"file_(?P<id>\d+)",
r"ref_id=(?P<id>\d+)",
r"target=[a-z]+_(?P<id>\d+)"
]
for regex in regexes:
if match := re.search(regex, self.url):
@ -71,6 +76,12 @@ class IliasPage:
if self._is_exercise_file():
log.explain("Page is an exercise, searching for elements")
return self._find_exercise_entries()
if self._is_personal_desktop():
log.explain("Page is the personal desktop, searching for elements")
return self._find_personal_desktop_entries()
if self._is_content_page():
log.explain("Page is a content page, searching for elements")
return self._find_copa_entries()
log.explain("Page is a normal folder, searching for elements")
return self._find_normal_entries()
@ -115,13 +126,22 @@ class IliasPage:
return False
def _is_personal_desktop(self) -> bool:
return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x})
def _is_content_page(self) -> bool:
link = self._soup.find(id="current_perma_link")
if not link:
return False
return "target=copa_" in link.get("value")
def _player_to_video(self) -> List[IliasPageElement]:
# Fetch the actual video page. This is a small wrapper page initializing a javscript
# player. Sadly we can not execute that JS. The actual video stream url is nowhere
# on the page, but defined in a JS object inside a script tag, passed to the player
# library.
# We do the impossible and RegEx the stream JSON object out of the page's HTML source
regex: re.Pattern[str] = re.compile(
regex = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
)
json_match = regex.search(str(self._soup))
@ -133,10 +153,64 @@ class IliasPage:
# parse it
json_object = json.loads(json_str)
# and fetch the video url!
video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"]
streams = [stream for stream in json_object["streams"]]
# and just fetch the lone video url!
if len(streams) == 1:
video_url = streams[0]["sources"]["mp4"][0]["src"]
return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)]
log.explain(f"Found multiple videos for stream at {self._source_name}")
items = []
for stream in sorted(streams, key=lambda stream: stream["content"]):
full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4"
video_url = stream["sources"]["mp4"][0]["src"]
items.append(IliasPageElement(IliasElementType.VIDEO, video_url, full_name))
return items
def _find_personal_desktop_entries(self) -> List[IliasPageElement]:
items: List[IliasPageElement] = []
titles: List[Tag] = self._soup.select(".il-item-title")
for title in titles:
link = title.find("a")
name = _sanitize_path_name(link.text.strip())
url = self._abs_url_from_link(link)
type = self._find_type_from_link(name, link, url)
if not type:
_unexpected_html_warning()
log.warn_contd(f"Could not extract type for {link}")
continue
log.explain(f"Found {name!r}")
if type == IliasElementType.FILE and "_download" not in url:
url = re.sub(r"(target=file_\d+)", r"\1_download", url)
log.explain("Rewired file URL to include download part")
items.append(IliasPageElement(type, url, name))
return items
def _find_copa_entries(self) -> List[IliasPageElement]:
items: List[IliasPageElement] = []
links: List[Tag] = self._soup.findAll(class_="ilc_flist_a_FileListItemLink")
for link in links:
url = self._abs_url_from_link(link)
name = _sanitize_path_name(link.getText().strip().replace("\t", ""))
if "file_id" not in url:
_unexpected_html_warning()
log.warn_contd(f"Found unknown content page item {name!r} with url {url!r}")
continue
items.append(IliasPageElement(IliasElementType.FILE, url, name))
return items
def _find_video_entries(self) -> List[IliasPageElement]:
# ILIAS has three stages for video pages
# 1. The initial dummy page without any videos. This page contains the link to the listing
@ -356,6 +430,8 @@ class IliasPage:
log.explain(f"Found {element_name!r}")
result.append(IliasPageElement(element_type, abs_url, element_name, description=description))
result += self._find_cards()
return result
def _find_upwards_folder_hierarchy(self, tag: Tag) -> List[str]:
@ -383,7 +459,10 @@ class IliasPage:
continue
prev: Tag = parent.findPreviousSibling("div")
if "ilContainerBlockHeader" in prev.get("class"):
if prev.find("h3"):
found_titles.append(prev.find("h3").getText().strip())
else:
found_titles.append(prev.find("h2").getText().strip())
# And this for real accordions
if "il_VAccordionContentDef" in parent.get("class"):
@ -438,6 +517,90 @@ class IliasPage:
log.explain(f"Found file {full_path!r}")
return IliasPageElement(IliasElementType.FILE, url, full_path, modification_date)
def _find_cards(self) -> List[IliasPageElement]:
result: List[IliasPageElement] = []
card_titles: List[Tag] = self._soup.select(".card-title a")
for title in card_titles:
url = self._abs_url_from_link(title)
name = _sanitize_path_name(title.getText().strip())
type = self._find_type_from_card(title)
if not type:
_unexpected_html_warning()
log.warn_contd(f"Could not extract type for {title}")
continue
result.append(IliasPageElement(type, url, name))
card_button_tiles: List[Tag] = self._soup.select(".card-title button")
for button in card_button_tiles:
regex = re.compile(button["id"] + r".*window.open\(['\"](.+?)['\"]")
res = regex.search(str(self._soup))
if not res:
_unexpected_html_warning()
log.warn_contd(f"Could not find click handler target for {button}")
continue
url = self._abs_url_from_relative(res.group(1))
name = _sanitize_path_name(button.getText().strip())
type = self._find_type_from_card(button)
caption_parent = button.findParent(
"div",
attrs={"class": lambda x: x and "caption" in x},
)
description = caption_parent.find_next_sibling("div").getText().strip()
if not type:
_unexpected_html_warning()
log.warn_contd(f"Could not extract type for {button}")
continue
result.append(IliasPageElement(type, url, name, description=description))
return result
def _find_type_from_card(self, card_title: Tag) -> Optional[IliasElementType]:
def is_card_root(element: Tag) -> bool:
return "il-card" in element["class"] and "thumbnail" in element["class"]
card_root: Optional[Tag] = None
# We look for the card root
for parent in card_title.parents:
if is_card_root(parent):
card_root = parent
break
if card_root is None:
_unexpected_html_warning()
log.warn_contd(f"Tried to figure out element type, but did not find an icon for {card_title}")
return None
icon: Tag = card_root.select_one(".il-card-repository-head .icon")
if "opencast" in icon["class"]:
return IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED
if "exc" in icon["class"]:
return IliasElementType.EXERCISE
if "webr" in icon["class"]:
return IliasElementType.LINK
if "book" in icon["class"]:
return IliasElementType.BOOKING
if "frm" in icon["class"]:
return IliasElementType.FORUM
if "sess" in icon["class"]:
return IliasElementType.MEETING
if "tst" in icon["class"]:
return IliasElementType.TEST
if "fold" in icon["class"]:
return IliasElementType.FOLDER
_unexpected_html_warning()
log.warn_contd(f"Could not extract type from {icon} for card title {card_title}")
return None
@staticmethod
def _find_type_from_link(
element_name: str,
@ -453,9 +616,30 @@ class IliasPage:
if "target=file_" in parsed_url.query:
return IliasElementType.FILE
if "target=grp_" in parsed_url.query:
return IliasElementType.FOLDER
if "target=crs_" in parsed_url.query:
return IliasElementType.FOLDER
if "baseClass=ilExerciseHandlerGUI" in parsed_url.query:
return IliasElementType.EXERCISE
if "baseClass=ilLinkResourceHandlerGUI" in parsed_url.query and "calldirectlink" in parsed_url.query:
return IliasElementType.LINK
if "cmd=showThreads" in parsed_url.query or "target=frm_" in parsed_url.query:
return IliasElementType.FORUM
if "cmdClass=ilobjtestgui" in parsed_url.query:
return IliasElementType.TEST
# Booking and Meeting can not be detected based on the link. They do have a ref_id though, so
# try to guess it from the image.
# Everything with a ref_id can *probably* be opened to reveal nested things
# video groups, directories, exercises, etc
if "ref_id=" in parsed_url.query:
if "ref_id=" in parsed_url.query or "goto.php" in parsed_url.path:
return IliasPage._find_type_from_folder_like(link_element, url)
_unexpected_html_warning()
@ -476,7 +660,7 @@ class IliasPage:
# We look for the outer div of our inner link, to find information around it
# (mostly the icon)
for parent in link_element.parents:
if "ilContainerListItemOuter" in parent["class"]:
if "ilContainerListItemOuter" in parent["class"] or "il-std-item" in parent["class"]:
found_parent = parent
break
@ -488,6 +672,9 @@ class IliasPage:
# Find the small descriptive icon to figure out the type
img_tag: Optional[Tag] = found_parent.select_one("img.ilListItemIcon")
if img_tag is None:
img_tag = found_parent.select_one("img.icon")
if img_tag is None:
_unexpected_html_warning()
log.warn_contd(f"Tried to figure out element type, but did not find an image for {url}")
@ -538,7 +725,13 @@ class IliasPage:
"""
Create an absolute url from an <a> tag.
"""
return urljoin(self._page_url, link_tag.get("href"))
return self._abs_url_from_relative(link_tag.get("href"))
def _abs_url_from_relative(self, relative_url: str) -> str:
"""
Create an absolute url from a relative URL.
"""
return urljoin(self._page_url, relative_url)
def _unexpected_html_warning() -> None:

View File

@ -1,7 +1,7 @@
import asyncio
import re
from pathlib import PurePath
from typing import Any, Awaitable, Callable, Dict, List, Optional, Set, TypeVar, Union
from typing import Any, Awaitable, Callable, Dict, List, Optional, Set, TypeVar, Union, cast
import aiohttp
from aiohttp import hdrs
@ -84,7 +84,7 @@ _VIDEO_ELEMENTS: Set[IliasElementType] = set([
AWrapped = TypeVar("AWrapped", bound=Callable[..., Awaitable[Optional[Any]]])
def _iorepeat(attempts: int, name: str) -> Callable[[AWrapped], AWrapped]:
def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]:
def decorator(f: AWrapped) -> AWrapped:
async def wrapper(*args: Any, **kwargs: Any) -> Optional[Any]:
last_exception: Optional[BaseException] = None
@ -105,6 +105,9 @@ def _iorepeat(attempts: int, name: str) -> Callable[[AWrapped], AWrapped]:
if last_exception:
message = f"Error in I/O Operation: {last_exception}"
if failure_is_error:
raise CrawlError(message) from last_exception
else:
raise CrawlWarning(message) from last_exception
raise CrawlError("Impossible return in ilias _iorepeat")
@ -179,6 +182,7 @@ instance's greatest bottleneck.
self._link_file_redirect_delay = section.link_redirect_delay()
self._links = section.links()
self._videos = section.videos()
self._visited_urls: Set[str] = set()
async def _run(self) -> None:
if isinstance(self._target, int):
@ -200,7 +204,9 @@ instance's greatest bottleneck.
await self._crawl_url(root_url, expected_id=course_id)
async def _crawl_desktop(self) -> None:
await self._crawl_url(self._base_url)
appendix = r"ILIAS\PersonalDesktop\PDMainBarProvider|mm_pd_sel_items"
appendix = appendix.encode("ASCII").hex()
await self._crawl_url(self._base_url + "/gs_content.php?item=" + appendix)
async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None:
maybe_cl = await self.crawl(PurePath("."))
@ -251,6 +257,7 @@ instance's greatest bottleneck.
return None
return self._crawl_ilias_page(url, parent, maybe_cl)
@anoncritical
async def _crawl_ilias_page(
self,
url: str,
@ -292,15 +299,23 @@ instance's greatest bottleneck.
# And execute them
await self.gather(tasks)
# These decorators only apply *to this method* and *NOT* to the returned
# awaitables!
# This method does not await the handlers but returns them instead.
# This ensures one level is handled at a time and name deduplication
# works correctly.
@anoncritical
# Shouldn't happen but we also really don't want to let I/O errors bubble up to anoncritical.
# If that happens we will be terminated as anoncritical doesn't tream them as non-critical.
@_wrap_io_in_warning("handling ilias element")
async def _handle_ilias_element(
self,
parent_path: PurePath,
element: IliasPageElement,
) -> Optional[Awaitable[None]]:
if element.url in self._visited_urls:
raise CrawlWarning(
f"Found second path to element {element.name!r} at {element.url!r}. Aborting subpath"
)
self._visited_urls.add(element.url)
element_path = PurePath(parent_path, element.name)
if element.type in _VIDEO_ELEMENTS:
@ -363,6 +378,7 @@ instance's greatest bottleneck.
return self._download_link(element, link_template_maybe, maybe_dl)
@anoncritical
@_iorepeat(3, "resolving link")
async def _download_link(self, element: IliasPageElement, link_template: str, dl: DownloadToken) -> None:
async with dl as (bar, sink):
@ -409,6 +425,7 @@ instance's greatest bottleneck.
return self._download_booking(element, link_template_maybe, maybe_dl)
@anoncritical
@_iorepeat(3, "resolving booking")
async def _download_booking(
self,
@ -439,22 +456,103 @@ instance's greatest bottleneck.
element: IliasPageElement,
element_path: PurePath,
) -> Optional[Awaitable[None]]:
# Videos will NOT be redownloaded - their content doesn't really change and they are chunky
maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.NEVER)
if not maybe_dl:
# Copy old mapping as it is likely still relevant
if self.prev_report:
self.report.add_custom_value(
str(element_path),
self.prev_report.get_custom_value(str(element_path))
)
# A video might contain other videos, so let's "crawl" the video first
# to ensure rate limits apply. This must be a download as *this token*
# is re-used if the video consists of a single stream. In that case the
# file name is used and *not* the stream name the ilias html parser reported
# to ensure backwards compatibility.
maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.ALWAYS)
# If we do not want to crawl it (user filter) or we have every file
# from the cached mapping already, we can ignore this and bail
if not maybe_dl or self._all_videos_locally_present(element_path):
# Mark all existing cideos as known so they do not get deleted
# during dleanup. We "downloaded" them, just without actually making
# a network request as we assumed they did not change.
for video in self._previous_contained_videos(element_path):
await self.download(video)
return None
return self._download_video(element, maybe_dl)
return self._download_video(element_path, element, maybe_dl)
def _previous_contained_videos(self, video_path: PurePath) -> List[PurePath]:
if not self.prev_report:
return []
custom_value = self.prev_report.get_custom_value(str(video_path))
if not custom_value:
return []
names = cast(List[str], custom_value)
folder = video_path.parent
return [PurePath(folder, name) for name in names]
def _all_videos_locally_present(self, video_path: PurePath) -> bool:
if contained_videos := self._previous_contained_videos(video_path):
log.explain_topic(f"Checking local cache for video {video_path.name}")
all_found_locally = True
for video in contained_videos:
transformed_path = self._transformer.transform(video)
if transformed_path:
exists_locally = self._output_dir.resolve(transformed_path).exists()
all_found_locally = all_found_locally and exists_locally
if all_found_locally:
log.explain("Found all videos locally, skipping enumeration request")
return True
log.explain("Missing at least one video, continuing with requests!")
return False
@anoncritical
@_iorepeat(3, "downloading video")
async def _download_video(self, element: IliasPageElement, dl: DownloadToken) -> None:
async def _download_video(
self,
original_path: PurePath,
element: IliasPageElement,
dl: DownloadToken
) -> None:
stream_elements: List[IliasPageElement] = []
async with dl as (bar, sink):
page = IliasPage(await self._get_page(element.url), element.url, element)
real_element = page.get_child_elements()[0]
stream_elements = page.get_child_elements()
log.explain(f"Streaming video from real url {real_element.url}")
if len(stream_elements) > 1:
log.explain(f"Found multiple video streams for {element.name}")
else:
log.explain(f"Using single video mode for {element.name}")
stream_element = stream_elements[0]
await self._stream_from_url(real_element.url, sink, bar, is_video=True)
transformed_path = self._transformer.transform(original_path)
if not transformed_path:
raise CrawlError(f"Download returned a path but transform did not for {original_path}")
# We do not have a local cache yet
if self._output_dir.resolve(transformed_path).exists():
log.explain(f"Video for {element.name} existed locally")
else:
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
self.report.add_custom_value(str(original_path), [original_path.name])
return
contained_video_paths: List[str] = []
for stream_element in stream_elements:
video_path = original_path.parent / stream_element.name
contained_video_paths.append(str(video_path))
maybe_dl = await self.download(video_path, mtime=element.mtime, redownload=Redownload.NEVER)
if not maybe_dl:
continue
async with maybe_dl as (bar, sink):
log.explain(f"Streaming video from real url {stream_element.url}")
await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
self.report.add_custom_value(str(original_path), contained_video_paths)
async def _handle_file(
self,
@ -466,6 +564,7 @@ instance's greatest bottleneck.
return None
return self._download_file(element, maybe_dl)
@anoncritical
@_iorepeat(3, "downloading file")
async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None:
assert dl # The function is only reached when dl is not None
@ -521,16 +620,23 @@ instance's greatest bottleneck.
# We repeat this as the login method in shibboleth doesn't handle I/O errors.
# Shibboleth is quite reliable as well, the repeat is likely not critical here.
@_iorepeat(3, "Login")
@_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session)
@staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool:
# Normal ILIAS pages
userlog = soup.find("li", {"id": "userlog"})
if userlog is not None:
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None:
login_button = mainbar.find("button", attrs={"data-action": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login
# Personal Desktop
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(

View File

@ -0,0 +1,164 @@
import os
import re
from dataclasses import dataclass
from pathlib import PurePath
from typing import Awaitable, List, Optional, Pattern, Set, Union
from urllib.parse import urljoin
from bs4 import BeautifulSoup, Tag
from ..config import Config
from ..logging import ProgressBar, log
from ..output_dir import FileSink
from ..utils import soupify
from .crawler import CrawlError
from .http_crawler import HttpCrawler, HttpCrawlerSection
class KitIpdCrawlerSection(HttpCrawlerSection):
def target(self) -> str:
target = self.s.get("target")
if not target:
self.missing_value("target")
if not target.startswith("https://"):
self.invalid_value("target", target, "Should be a URL")
return target
def link_regex(self) -> Pattern[str]:
regex = self.s.get("link_regex", r"^.*/[^/]*\.(?:pdf|zip|c|java)$")
return re.compile(regex)
@dataclass(unsafe_hash=True)
class KitIpdFile:
name: str
url: str
@dataclass
class KitIpdFolder:
name: str
files: List[KitIpdFile]
def explain(self) -> None:
log.explain_topic(f"Folder {self.name!r}")
for file in self.files:
log.explain(f"File {file.name!r}")
def __hash__(self) -> int:
return self.name.__hash__()
class KitIpdCrawler(HttpCrawler):
def __init__(
self,
name: str,
section: KitIpdCrawlerSection,
config: Config,
):
super().__init__(name, section, config)
self._url = section.target()
self._file_regex = section.link_regex()
async def _run(self) -> None:
maybe_cl = await self.crawl(PurePath("."))
if not maybe_cl:
return
tasks: List[Awaitable[None]] = []
async with maybe_cl:
for item in await self._fetch_items():
if isinstance(item, KitIpdFolder):
tasks.append(self._crawl_folder(item))
else:
# Orphan files are placed in the root folder
tasks.append(self._download_file(PurePath("."), item))
await self.gather(tasks)
async def _crawl_folder(self, folder: KitIpdFolder) -> None:
path = PurePath(folder.name)
if not await self.crawl(path):
return
tasks = [self._download_file(path, file) for file in folder.files]
await self.gather(tasks)
async def _download_file(self, parent: PurePath, file: KitIpdFile) -> None:
element_path = parent / file.name
maybe_dl = await self.download(element_path)
if not maybe_dl:
return
async with maybe_dl as (bar, sink):
await self._stream_from_url(file.url, sink, bar)
async def _fetch_items(self) -> Set[Union[KitIpdFile, KitIpdFolder]]:
page = await self.get_page()
elements: List[Tag] = self._find_file_links(page)
items: Set[Union[KitIpdFile, KitIpdFolder]] = set()
for element in elements:
folder_label = self._find_folder_label(element)
if folder_label:
folder = self._extract_folder(folder_label)
if folder not in items:
items.add(folder)
folder.explain()
else:
file = self._extract_file(element)
items.add(file)
log.explain_topic(f"Orphan file {file.name!r}")
log.explain("Attributing it to root folder")
return items
def _extract_folder(self, folder_tag: Tag) -> KitIpdFolder:
files: List[KitIpdFile] = []
name = folder_tag.getText().strip()
container: Tag = folder_tag.findNextSibling(name="table")
for link in self._find_file_links(container):
files.append(self._extract_file(link))
return KitIpdFolder(name, files)
@staticmethod
def _find_folder_label(file_link: Tag) -> Optional[Tag]:
enclosing_table: Tag = file_link.findParent(name="table")
if enclosing_table is None:
return None
return enclosing_table.findPreviousSibling(name=re.compile("^h[1-6]$"))
def _extract_file(self, link: Tag) -> KitIpdFile:
url = self._abs_url_from_link(link)
name = os.path.basename(url)
return KitIpdFile(name, url)
def _find_file_links(self, tag: Union[Tag, BeautifulSoup]) -> List[Tag]:
return tag.findAll(name="a", attrs={"href": self._file_regex})
def _abs_url_from_link(self, link_tag: Tag) -> str:
return urljoin(self._url, link_tag.get("href"))
async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp:
if resp.status == 403:
raise CrawlError("Received a 403. Are you within the KIT network/VPN?")
if resp.content_length:
bar.set_total(resp.content_length)
async for data in resp.content.iter_chunked(1024):
sink.file.write(data)
bar.advance(len(data))
sink.done()
async def get_page(self) -> BeautifulSoup:
async with self.session.get(self._url) as request:
return soupify(await request.read())

View File

@ -5,7 +5,7 @@ from contextlib import asynccontextmanager, contextmanager
# TODO In Python 3.9 and above, ContextManager is deprecated
from typing import AsyncIterator, ContextManager, Iterator, List, Optional
from rich.console import Console, RenderGroup
from rich.console import Console, Group
from rich.live import Live
from rich.markup import escape
from rich.panel import Panel
@ -68,7 +68,7 @@ class Log:
if self._download_progress.task_ids:
elements.append(self._download_progress)
group = RenderGroup(*elements) # type: ignore
group = Group(*elements) # type: ignore
self._live.update(group)
@contextmanager

View File

@ -231,7 +231,10 @@ class OutputDirectory:
stat = local_path.stat()
remote_newer = None
if mtime := heuristics.mtime:
# Python on Windows crashes when faced with timestamps around the unix epoch
if heuristics.mtime and (os.name != "nt" or heuristics.mtime.year > 1970):
mtime = heuristics.mtime
remote_newer = mtime.timestamp() > stat.st_mtime
if remote_newer:
log.explain("Remote file seems to be newer")

View File

@ -182,5 +182,13 @@ class Pferd:
something_changed = True
log.report(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
for warning in crawler.report.encountered_warnings:
something_changed = True
log.report(f" [bold bright_red]Warning[/] {warning}")
for error in crawler.report.encountered_errors:
something_changed = True
log.report(f" [bold bright_red]Error[/] {error}")
if not something_changed:
log.report(" Nothing changed")

View File

@ -1,6 +1,6 @@
import json
from pathlib import Path, PurePath
from typing import Any, Dict, List, Set
from typing import Any, Dict, List, Optional, Set
class ReportLoadError(Exception):
@ -68,6 +68,13 @@ class Report:
# Files that should have been deleted by the cleanup but weren't
self.not_deleted_files: Set[PurePath] = set()
# Custom crawler-specific data
self.custom: Dict[str, Any] = dict()
# Encountered errors and warnings
self.encountered_warnings: List[str] = []
self.encountered_errors: List[str] = []
@staticmethod
def _get_list_of_strs(data: Dict[str, Any], key: str) -> List[str]:
result: Any = data.get(key, [])
@ -81,6 +88,15 @@ class Report:
return result
@staticmethod
def _get_str_dictionary(data: Dict[str, Any], key: str) -> Dict[str, Any]:
result: Dict[str, Any] = data.get(key, {})
if not isinstance(result, dict):
raise ReportLoadError(f"Incorrect format: {key!r} is not a dictionary")
return result
@classmethod
def load(cls, path: Path) -> "Report":
"""
@ -108,6 +124,9 @@ class Report:
self.delete_file(PurePath(elem))
for elem in self._get_list_of_strs(data, "not_deleted"):
self.not_delete_file(PurePath(elem))
self.custom = self._get_str_dictionary(data, "custom")
self.encountered_errors = self._get_list_of_strs(data, "encountered_errors")
self.encountered_warnings = self._get_list_of_strs(data, "encountered_warnings")
return self
@ -124,6 +143,9 @@ class Report:
"changed": [str(path) for path in sorted(self.changed_files)],
"deleted": [str(path) for path in sorted(self.deleted_files)],
"not_deleted": [str(path) for path in sorted(self.not_deleted_files)],
"custom": self.custom,
"encountered_warnings": self.encountered_warnings,
"encountered_errors": self.encountered_errors,
}
with open(path, "w") as f:
@ -190,3 +212,27 @@ class Report:
"""
self.not_deleted_files.add(path)
def add_custom_value(self, key: str, value: Any) -> None:
"""
Adds a custom value under the passed key, overwriting any existing
"""
self.custom[key] = value
def get_custom_value(self, key: str) -> Optional[Any]:
"""
Retrieves a custom value for the given key.
"""
return self.custom.get(key)
def add_error(self, error: str) -> None:
"""
Adds an error to this report's error list.
"""
self.encountered_errors.append(error)
def add_warning(self, warning: str) -> None:
"""
Adds a warning to this report's warning list.
"""
self.encountered_warnings.append(warning)

View File

@ -1,2 +1,2 @@
NAME = "PFERD"
VERSION = "3.2.0"
VERSION = "3.3.0"

View File

@ -6,11 +6,11 @@ version = attr: PFERD.version.VERSION
packages = find:
python_requires = >=3.8
install_requires =
aiohttp>=3.7.4.post0
beautifulsoup4>=4.9.3
rich>=10.1.0
keyring>=23.0.1
certifi>=2020.12.5
aiohttp>=3.8.1
beautifulsoup4>=4.10.0
rich>=11.0.0
keyring>=23.5.0
certifi>=2021.10.8
[options.entry_points]
console_scripts =