Split up ilias crawler in multiple files

The ilias crawler contained a crawler and an HTML parser, now they are
split in two.
This commit is contained in:
I-Al-Istannen 2021-05-19 21:34:36 +02:00
parent 3300886120
commit 9f03702e69
4 changed files with 488 additions and 471 deletions

View File

@ -0,0 +1,3 @@
from .kit_web_ilias_crawler import KitIliasCrawler, KitIliasCrawlerSection
__all__ = ["KitIliasCrawler", "KitIliasCrawlerSection"]

View File

@ -0,0 +1,452 @@
import json
import re
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from enum import Enum
# TODO In Python 3.9 and above, AsyncContextManager is deprecated
from typing import List, Optional, Union
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup, Tag
from PFERD.utils import url_set_query_params
TargetType = Union[str, int]
class IliasElementType(Enum):
EXERCISE = "exercise"
FILE = "file"
FOLDER = "folder"
FORUM = "forum"
LINK = "link"
MEETING = "meeting"
VIDEO = "video"
VIDEO_PLAYER = "video_player"
VIDEO_FOLDER = "video_folder"
VIDEO_FOLDER_MAYBE_PAGINATED = "video_folder_maybe_paginated"
@dataclass
class IliasPageElement:
type: IliasElementType
url: str
name: str
mtime: Optional[datetime] = None
description: Optional[str] = None
class IliasPage:
def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]):
self._soup = soup
self._page_url = _page_url
self._page_type = source_element.type if source_element else None
self._source_name = source_element.name if source_element else ""
def get_child_elements(self) -> List[IliasPageElement]:
"""
Return all child page elements you can find here.
"""
if self._is_video_player():
return self._player_to_video()
if self._is_video_listing():
return self._find_video_entries()
if self._is_exercise_file():
return self._find_exercise_entries()
return self._find_normal_entries()
def _is_video_player(self) -> bool:
return "paella_config_file" in str(self._soup)
def _is_video_listing(self) -> bool:
# ILIAS fluff around it
if self._soup.find(id="headerimage"):
element: Tag = self._soup.find(id="headerimage")
if "opencast" in element.attrs["src"].lower():
return True
# Raw listing without ILIAS fluff
video_element_table: Tag = self._soup.find(
name="table", id=re.compile(r"tbl_xoct_.+")
)
return video_element_table is not None
def _is_exercise_file(self) -> bool:
# we know it from before
if self._page_type == IliasElementType.EXERCISE:
return True
# We have no suitable parent - let's guesss
if self._soup.find(id="headerimage"):
element: Tag = self._soup.find(id="headerimage")
if "exc" in element.attrs["src"].lower():
return True
return False
def _player_to_video(self) -> List[IliasPageElement]:
# Fetch the actual video page. This is a small wrapper page initializing a javscript
# player. Sadly we can not execute that JS. The actual video stream url is nowhere
# on the page, but defined in a JS object inside a script tag, passed to the player
# library.
# We do the impossible and RegEx the stream JSON object out of the page's HTML source
regex: re.Pattern[str] = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
)
json_match = regex.search(str(self._soup))
if json_match is None:
print(f"Could not find json stream info for {self._page_url!r}")
return []
json_str = json_match.group(1)
# parse it
json_object = json.loads(json_str)
# and fetch the video url!
video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"]
return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)]
def _find_video_entries(self) -> List[IliasPageElement]:
# ILIAS has three stages for video pages
# 1. The initial dummy page without any videos. This page contains the link to the listing
# 2. The video listing which might be paginated
# 3. An unpaginated video listing (or at least one that includes 800 videos)
#
# We need to figure out where we are.
video_element_table: Tag = self._soup.find(
name="table", id=re.compile(r"tbl_xoct_.+")
)
if video_element_table is None:
# We are in stage 1
# The page is actually emtpy but contains the link to stage 2
content_link: Tag = self._soup.select_one("#tab_series a")
url: str = self._abs_url_from_link(content_link)
query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
url = url_set_query_params(url, query_params)
return [IliasPageElement(IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED, url, "")]
is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None
if is_paginated and not self._page_type == IliasElementType.VIDEO_FOLDER:
# We are in stage 2 - try to break pagination
return self._find_video_entries_paginated()
return self._find_video_entries_no_paging()
def _find_video_entries_paginated(self) -> List[IliasPageElement]:
table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+"))
if table_element is None:
# TODO: Properly log this
print(
"Could not increase elements per page (table not found)."
" Some might not be crawled!"
)
return self._find_video_entries_no_paging()
id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"])
if id_match is None:
# TODO: Properly log this
print(
"Could not increase elements per page (table id not found)."
" Some might not be crawled!"
)
return self._find_video_entries_no_paging()
table_id = id_match.group(1)
query_params = {f"tbl_xoct_{table_id}_trows": "800",
"cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
url = url_set_query_params(self._page_url, query_params)
return [IliasPageElement(IliasElementType.VIDEO_FOLDER, url, "")]
def _find_video_entries_no_paging(self) -> List[IliasPageElement]:
"""
Crawls the "second stage" video page. This page contains the actual video urls.
"""
# Video start links are marked with an "Abspielen" link
video_links: List[Tag] = self._soup.findAll(
name="a", text=re.compile(r"\s*Abspielen\s*")
)
results: List[IliasPageElement] = []
# TODO: Sadly the download button is currently broken, so never do that
for link in video_links:
results.append(self._listed_video_to_element(link))
return results
def _listed_video_to_element(self, link: Tag) -> IliasPageElement:
# The link is part of a table with multiple columns, describing metadata.
# 6th child (1 indexed) is the modification time string
modification_string = link.parent.parent.parent.select_one(
"td.std:nth-child(6)"
).getText().strip()
modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M")
title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip()
title += ".mp4"
video_name: str = _sanitize_path_name(title)
video_url = self._abs_url_from_link(link)
return IliasPageElement(IliasElementType.VIDEO_PLAYER, video_url, video_name, modification_time)
def _find_exercise_entries(self) -> List[IliasPageElement]:
results: List[IliasPageElement] = []
# Each assignment is in an accordion container
assignment_containers: List[Tag] = self._soup.select(".il_VAccordionInnerContainer")
for container in assignment_containers:
# Fetch the container name out of the header to use it in the path
container_name = container.select_one(".ilAssignmentHeader").getText().strip()
# Find all download links in the container (this will contain all the files)
files: List[Tag] = container.findAll(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x},
text="Download"
)
# Grab each file as you now have the link
for file_link in files:
# Two divs, side by side. Left is the name, right is the link ==> get left
# sibling
file_name = file_link.parent.findPrevious(name="div").getText().strip()
file_name = _sanitize_path_name(file_name)
url = self._abs_url_from_link(file_link)
results.append(IliasPageElement(
IliasElementType.FILE,
url,
container_name + "/" + file_name,
None # We do not have any timestamp
))
return results
def _find_normal_entries(self) -> List[IliasPageElement]:
result: List[IliasPageElement] = []
# Fetch all links and throw them to the general interpreter
links: List[Tag] = self._soup.select("a.il_ContainerItemTitle")
for link in links:
abs_url = self._abs_url_from_link(link)
element_name = _sanitize_path_name(link.getText())
element_type = self._find_type_from_link(element_name, link, abs_url)
description = self._find_link_description(link)
if not element_type:
continue
if element_type == IliasElementType.MEETING:
element_name = _sanitize_path_name(self._normalize_meeting_name(element_name))
elif element_type == IliasElementType.FILE:
result.append(self._file_to_element(element_name, abs_url, link))
continue
result.append(IliasPageElement(element_type, abs_url, element_name, description=description))
return result
def _find_link_description(self, link: Tag) -> Optional[str]:
tile: Tag = link.findParent("div", {"class": lambda x: x and "il_ContainerListItem" in x})
if not tile:
return None
description_element: Tag = tile.find("div", {"class": lambda x: x and "il_Description" in x})
if not description_element:
return None
return description_element.getText().strip()
def _file_to_element(self, name: str, url: str, link_element: Tag) -> IliasPageElement:
# Files have a list of properties (type, modification date, size, etc.)
# In a series of divs.
# Find the parent containing all those divs, so we can filter our what we need
properties_parent: Tag = link_element.findParent(
"div", {"class": lambda x: "il_ContainerListItem" in x}
).select_one(".il_ItemProperties")
# The first one is always the filetype
file_type = properties_parent.select_one("span.il_ItemProperty").getText().strip()
# The rest does not have a stable order. Grab the whole text and reg-ex the date
# out of it
all_properties_text = properties_parent.getText().strip()
modification_date_match = re.search(
r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)",
all_properties_text
)
if modification_date_match is None:
modification_date = None
# TODO: Properly log this
print(f"Could not extract start date from {all_properties_text!r}")
else:
modification_date_str = modification_date_match.group(1)
modification_date = demangle_date(modification_date_str)
# Grab the name from the link text
name = _sanitize_path_name(link_element.getText())
full_path = name + "." + file_type
return IliasPageElement(IliasElementType.FILE, url, full_path, modification_date)
@staticmethod
def _find_type_from_link(
element_name: str,
link_element: Tag,
url: str
) -> Optional[IliasElementType]:
"""
Decides which sub crawler to use for a given top level element.
"""
parsed_url = urlparse(url)
# file URLs contain "target=file"
if "target=file_" in parsed_url.query:
return IliasElementType.FILE
# Skip forums
if "cmd=showThreads" in parsed_url.query:
return IliasElementType.FORUM
# Everything with a ref_id can *probably* be opened to reveal nested things
# video groups, directories, exercises, etc
if "ref_id=" in parsed_url.query:
return IliasPage._find_type_from_folder_like(link_element, url)
# TODO: Log this properly
print(f"Unknown type: The element was at {str(element_name)!r} and it is {link_element!r})")
return None
@staticmethod
def _find_type_from_folder_like(link_element: Tag, url: str) -> Optional[IliasElementType]:
"""
Try crawling something that looks like a folder.
"""
# pylint: disable=too-many-return-statements
found_parent: Optional[Tag] = None
# We look for the outer div of our inner link, to find information around it
# (mostly the icon)
for parent in link_element.parents:
if "ilContainerListItemOuter" in parent["class"]:
found_parent = parent
break
if found_parent is None:
# TODO: Log this properly
print(f"Could not find element icon for {url!r}")
return None
# Find the small descriptive icon to figure out the type
img_tag: Optional[Tag] = found_parent.select_one("img.ilListItemIcon")
if img_tag is None:
# TODO: Log this properly
print(f"Could not find image tag for {url!r}")
return None
if "opencast" in str(img_tag["alt"]).lower():
return IliasElementType.VIDEO_FOLDER
if str(img_tag["src"]).endswith("icon_exc.svg"):
return IliasElementType.EXERCISE
if str(img_tag["src"]).endswith("icon_webr.svg"):
return IliasElementType.LINK
if str(img_tag["src"]).endswith("frm.svg"):
return IliasElementType.FORUM
if str(img_tag["src"]).endswith("sess.svg"):
return IliasElementType.MEETING
return IliasElementType.FOLDER
@staticmethod
def _normalize_meeting_name(meeting_name: str) -> str:
"""
Normalizes meeting names, which have a relative time as their first part,
to their date in ISO format.
"""
date_portion_str = meeting_name.split(" - ")[0]
date_portion = demangle_date(date_portion_str)
if not date_portion:
return meeting_name
rest_of_name = meeting_name
if rest_of_name.startswith(date_portion_str):
rest_of_name = rest_of_name[len(date_portion_str):]
return datetime.strftime(date_portion, "%Y-%m-%d, %H:%M") + rest_of_name
def _abs_url_from_link(self, link_tag: Tag) -> str:
"""
Create an absolute url from an <a> tag.
"""
return urljoin(self._page_url, link_tag.get("href"))
german_months = ['Jan', 'Feb', 'Mär', 'Apr', 'Mai', 'Jun', 'Jul', 'Aug', 'Sep', 'Okt', 'Nov', 'Dez']
english_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
def demangle_date(date_str: str) -> Optional[datetime]:
"""
Demangle a given date in one of the following formats:
"Gestern, HH:MM"
"Heute, HH:MM"
"Morgen, HH:MM"
"dd. mon yyyy, HH:MM
"""
try:
date_str = re.sub(r"\s+", " ", date_str)
date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I)
date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I)
date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I)
for german, english in zip(german_months, english_months):
date_str = date_str.replace(german, english)
# Remove trailing dots for abbreviations, e.g. "20. Apr. 2020" -> "20. Apr 2020"
date_str = date_str.replace(english + ".", english)
# We now have a nice english String in the format: "dd. mmm yyyy, hh:mm"
day_part, time_part = date_str.split(",")
day_str, month_str, year_str = day_part.split(" ")
day = int(day_str.strip().replace(".", ""))
month = english_months.index(month_str.strip()) + 1
year = int(year_str.strip())
hour_str, minute_str = time_part.split(":")
hour = int(hour_str)
minute = int(minute_str)
return datetime(year, month, day, hour, minute)
except Exception:
# TODO: Properly log this
print(f"Could not parse date {date_str!r}")
return None
def _format_date_english(date_to_format: date) -> str:
month = english_months[date_to_format.month - 1]
return f"{date_to_format.day:02d}. {month} {date_to_format.year:04d}"
def _yesterday() -> date:
return date.today() - timedelta(days=1)
def _tomorrow() -> date:
return date.today() + timedelta(days=1)
def _sanitize_path_name(name: str) -> str:
return name.replace("/", "-").replace("\\", "-").strip()

View File

@ -1,23 +1,19 @@
import asyncio import asyncio
import json
import re import re
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from enum import Enum
from pathlib import PurePath from pathlib import PurePath
# TODO In Python 3.9 and above, AsyncContextManager is deprecated # TODO In Python 3.9 and above, AsyncContextManager is deprecated
from typing import Any, Dict, List, Optional, Set, Union from typing import Any, Dict, Optional, Set, Union
from urllib.parse import parse_qs, urlencode, urljoin, urlparse, urlsplit, urlunsplit
import aiohttp import aiohttp
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
from PFERD.authenticators import Authenticator
from PFERD.config import Config
from PFERD.crawler import CrawlerSection, HttpCrawler, anoncritical, arepeat
from PFERD.output_dir import Redownload from PFERD.output_dir import Redownload
from PFERD.utils import soupify from PFERD.utils import soupify, url_set_query_param
from ..authenticators import Authenticator from .kit_ilias_html import IliasElementType, IliasPage, IliasPageElement
from ..config import Config
from ..crawler import CrawlerSection, HttpCrawler, anoncritical, arepeat
TargetType = Union[str, int] TargetType = Union[str, int]
@ -58,465 +54,6 @@ class KitIliasCrawlerSection(CrawlerSection):
return self.s.getboolean("link_file_plain_text", fallback=False) return self.s.getboolean("link_file_plain_text", fallback=False)
class IliasElementType(Enum):
EXERCISE = "exercise"
FILE = "file"
FOLDER = "folder"
FORUM = "forum"
LINK = "link"
MEETING = "meeting"
VIDEO = "video"
VIDEO_PLAYER = "video_player"
VIDEO_FOLDER = "video_folder"
VIDEO_FOLDER_MAYBE_PAGINATED = "video_folder_maybe_paginated"
@dataclass
class IliasPageElement:
type: IliasElementType
url: str
name: str
mtime: Optional[datetime] = None
description: Optional[str] = None
class IliasPage:
def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]):
self._soup = soup
self._page_url = _page_url
self._page_type = source_element.type if source_element else None
self._source_name = source_element.name if source_element else ""
def get_child_elements(self) -> List[IliasPageElement]:
"""
Return all child page elements you can find here.
"""
if self._is_video_player():
return self._player_to_video()
if self._is_video_listing():
return self._find_video_entries()
if self._is_exercise_file():
return self._find_exercise_entries()
return self._find_normal_entries()
def _is_video_player(self) -> bool:
return "paella_config_file" in str(self._soup)
def _is_video_listing(self) -> bool:
# ILIAS fluff around it
if self._soup.find(id="headerimage"):
element: Tag = self._soup.find(id="headerimage")
if "opencast" in element.attrs["src"].lower():
return True
# Raw listing without ILIAS fluff
video_element_table: Tag = self._soup.find(
name="table", id=re.compile(r"tbl_xoct_.+")
)
return video_element_table is not None
def _is_exercise_file(self) -> bool:
# we know it from before
if self._page_type == IliasElementType.EXERCISE:
return True
# We have no suitable parent - let's guesss
if self._soup.find(id="headerimage"):
element: Tag = self._soup.find(id="headerimage")
if "exc" in element.attrs["src"].lower():
return True
return False
def _player_to_video(self) -> List[IliasPageElement]:
# Fetch the actual video page. This is a small wrapper page initializing a javscript
# player. Sadly we can not execute that JS. The actual video stream url is nowhere
# on the page, but defined in a JS object inside a script tag, passed to the player
# library.
# We do the impossible and RegEx the stream JSON object out of the page's HTML source
regex: re.Pattern[str] = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
)
json_match = regex.search(str(self._soup))
if json_match is None:
print(f"Could not find json stream info for {self._page_url!r}")
return []
json_str = json_match.group(1)
# parse it
json_object = json.loads(json_str)
# and fetch the video url!
video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"]
return [IliasPageElement(IliasElementType.VIDEO, video_url, self._source_name)]
def _find_video_entries(self) -> List[IliasPageElement]:
# ILIAS has three stages for video pages
# 1. The initial dummy page without any videos. This page contains the link to the listing
# 2. The video listing which might be paginated
# 3. An unpaginated video listing (or at least one that includes 800 videos)
#
# We need to figure out where we are.
video_element_table: Tag = self._soup.find(
name="table", id=re.compile(r"tbl_xoct_.+")
)
if video_element_table is None:
# We are in stage 1
# The page is actually emtpy but contains the link to stage 2
content_link: Tag = self._soup.select_one("#tab_series a")
url: str = self._abs_url_from_link(content_link)
query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
url = _url_set_query_params(url, query_params)
return [IliasPageElement(IliasElementType.VIDEO_FOLDER_MAYBE_PAGINATED, url, "")]
is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None
if is_paginated and not self._page_type == IliasElementType.VIDEO_FOLDER:
# We are in stage 2 - try to break pagination
return self._find_video_entries_paginated()
return self._find_video_entries_no_paging()
def _find_video_entries_paginated(self) -> List[IliasPageElement]:
table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+"))
if table_element is None:
# TODO: Properly log this
print(
"Could not increase elements per page (table not found)."
" Some might not be crawled!"
)
return self._find_video_entries_no_paging()
id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"])
if id_match is None:
# TODO: Properly log this
print(
"Could not increase elements per page (table id not found)."
" Some might not be crawled!"
)
return self._find_video_entries_no_paging()
table_id = id_match.group(1)
query_params = {f"tbl_xoct_{table_id}_trows": "800",
"cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
url = _url_set_query_params(self._page_url, query_params)
return [IliasPageElement(IliasElementType.VIDEO_FOLDER, url, "")]
def _find_video_entries_no_paging(self) -> List[IliasPageElement]:
"""
Crawls the "second stage" video page. This page contains the actual video urls.
"""
# Video start links are marked with an "Abspielen" link
video_links: List[Tag] = self._soup.findAll(
name="a", text=re.compile(r"\s*Abspielen\s*")
)
results: List[IliasPageElement] = []
# TODO: Sadly the download button is currently broken, so never do that
for link in video_links:
results.append(self._listed_video_to_element(link))
return results
def _listed_video_to_element(self, link: Tag) -> IliasPageElement:
# The link is part of a table with multiple columns, describing metadata.
# 6th child (1 indexed) is the modification time string
modification_string = link.parent.parent.parent.select_one(
"td.std:nth-child(6)"
).getText().strip()
modification_time = datetime.strptime(modification_string, "%d.%m.%Y - %H:%M")
title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip()
title += ".mp4"
video_name: str = _sanitize_path_name(title)
video_url = self._abs_url_from_link(link)
return IliasPageElement(IliasElementType.VIDEO_PLAYER, video_url, video_name, modification_time)
def _find_exercise_entries(self) -> List[IliasPageElement]:
results: List[IliasPageElement] = []
# Each assignment is in an accordion container
assignment_containers: List[Tag] = self._soup.select(".il_VAccordionInnerContainer")
for container in assignment_containers:
# Fetch the container name out of the header to use it in the path
container_name = container.select_one(".ilAssignmentHeader").getText().strip()
# Find all download links in the container (this will contain all the files)
files: List[Tag] = container.findAll(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x},
text="Download"
)
# Grab each file as you now have the link
for file_link in files:
# Two divs, side by side. Left is the name, right is the link ==> get left
# sibling
file_name = file_link.parent.findPrevious(name="div").getText().strip()
file_name = _sanitize_path_name(file_name)
url = self._abs_url_from_link(file_link)
results.append(IliasPageElement(
IliasElementType.FILE,
url,
container_name + "/" + file_name,
None # We do not have any timestamp
))
return results
def _find_normal_entries(self) -> List[IliasPageElement]:
result: List[IliasPageElement] = []
# Fetch all links and throw them to the general interpreter
links: List[Tag] = self._soup.select("a.il_ContainerItemTitle")
for link in links:
abs_url = self._abs_url_from_link(link)
element_name = _sanitize_path_name(link.getText())
element_type = self._find_type_from_link(element_name, link, abs_url)
description = self._find_link_description(link)
if not element_type:
continue
if element_type == IliasElementType.MEETING:
element_name = _sanitize_path_name(self._normalize_meeting_name(element_name))
elif element_type == IliasElementType.FILE:
result.append(self._file_to_element(element_name, abs_url, link))
continue
result.append(IliasPageElement(element_type, abs_url, element_name, description=description))
return result
def _find_link_description(self, link: Tag) -> Optional[str]:
tile: Tag = link.findParent("div", {"class": lambda x: x and "il_ContainerListItem" in x})
if not tile:
return None
description_element: Tag = tile.find("div", {"class": lambda x: x and "il_Description" in x})
if not description_element:
return None
return description_element.getText().strip()
def _file_to_element(self, name: str, url: str, link_element: Tag) -> IliasPageElement:
# Files have a list of properties (type, modification date, size, etc.)
# In a series of divs.
# Find the parent containing all those divs, so we can filter our what we need
properties_parent: Tag = link_element.findParent(
"div", {"class": lambda x: "il_ContainerListItem" in x}
).select_one(".il_ItemProperties")
# The first one is always the filetype
file_type = properties_parent.select_one("span.il_ItemProperty").getText().strip()
# The rest does not have a stable order. Grab the whole text and reg-ex the date
# out of it
all_properties_text = properties_parent.getText().strip()
modification_date_match = re.search(
r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)",
all_properties_text
)
if modification_date_match is None:
modification_date = None
# TODO: Properly log this
print(f"Could not extract start date from {all_properties_text!r}")
else:
modification_date_str = modification_date_match.group(1)
modification_date = demangle_date(modification_date_str)
# Grab the name from the link text
name = _sanitize_path_name(link_element.getText())
full_path = name + "." + file_type
return IliasPageElement(IliasElementType.FILE, url, full_path, modification_date)
@staticmethod
def _find_type_from_link(
element_name: str,
link_element: Tag,
url: str
) -> Optional[IliasElementType]:
"""
Decides which sub crawler to use for a given top level element.
"""
parsed_url = urlparse(url)
# file URLs contain "target=file"
if "target=file_" in parsed_url.query:
return IliasElementType.FILE
# Skip forums
if "cmd=showThreads" in parsed_url.query:
return IliasElementType.FORUM
# Everything with a ref_id can *probably* be opened to reveal nested things
# video groups, directories, exercises, etc
if "ref_id=" in parsed_url.query:
return IliasPage._find_type_from_folder_like(link_element, url)
# TODO: Log this properly
print(f"Unknown type: The element was at {str(element_name)!r} and it is {link_element!r})")
return None
@staticmethod
def _find_type_from_folder_like(link_element: Tag, url: str) -> Optional[IliasElementType]:
"""
Try crawling something that looks like a folder.
"""
# pylint: disable=too-many-return-statements
found_parent: Optional[Tag] = None
# We look for the outer div of our inner link, to find information around it
# (mostly the icon)
for parent in link_element.parents:
if "ilContainerListItemOuter" in parent["class"]:
found_parent = parent
break
if found_parent is None:
# TODO: Log this properly
print(f"Could not find element icon for {url!r}")
return None
# Find the small descriptive icon to figure out the type
img_tag: Optional[Tag] = found_parent.select_one("img.ilListItemIcon")
if img_tag is None:
# TODO: Log this properly
print(f"Could not find image tag for {url!r}")
return None
if "opencast" in str(img_tag["alt"]).lower():
return IliasElementType.VIDEO_FOLDER
if str(img_tag["src"]).endswith("icon_exc.svg"):
return IliasElementType.EXERCISE
if str(img_tag["src"]).endswith("icon_webr.svg"):
return IliasElementType.LINK
if str(img_tag["src"]).endswith("frm.svg"):
return IliasElementType.FORUM
if str(img_tag["src"]).endswith("sess.svg"):
return IliasElementType.MEETING
return IliasElementType.FOLDER
@staticmethod
def _normalize_meeting_name(meeting_name: str) -> str:
"""
Normalizes meeting names, which have a relative time as their first part,
to their date in ISO format.
"""
date_portion_str = meeting_name.split(" - ")[0]
date_portion = demangle_date(date_portion_str)
if not date_portion:
return meeting_name
rest_of_name = meeting_name
if rest_of_name.startswith(date_portion_str):
rest_of_name = rest_of_name[len(date_portion_str):]
return datetime.strftime(date_portion, "%Y-%m-%d, %H:%M") + rest_of_name
def _abs_url_from_link(self, link_tag: Tag) -> str:
"""
Create an absolute url from an <a> tag.
"""
return urljoin(self._page_url, link_tag.get("href"))
german_months = ['Jan', 'Feb', 'Mär', 'Apr', 'Mai', 'Jun', 'Jul', 'Aug', 'Sep', 'Okt', 'Nov', 'Dez']
english_months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
def demangle_date(date_str: str) -> Optional[datetime]:
"""
Demangle a given date in one of the following formats:
"Gestern, HH:MM"
"Heute, HH:MM"
"Morgen, HH:MM"
"dd. mon yyyy, HH:MM
"""
try:
date_str = re.sub(r"\s+", " ", date_str)
date_str = re.sub("Gestern|Yesterday", _format_date_english(_yesterday()), date_str, re.I)
date_str = re.sub("Heute|Today", _format_date_english(date.today()), date_str, re.I)
date_str = re.sub("Morgen|Tomorrow", _format_date_english(_tomorrow()), date_str, re.I)
for german, english in zip(german_months, english_months):
date_str = date_str.replace(german, english)
# Remove trailing dots for abbreviations, e.g. "20. Apr. 2020" -> "20. Apr 2020"
date_str = date_str.replace(english + ".", english)
# We now have a nice english String in the format: "dd. mmm yyyy, hh:mm"
day_part, time_part = date_str.split(",")
day_str, month_str, year_str = day_part.split(" ")
day = int(day_str.strip().replace(".", ""))
month = english_months.index(month_str.strip()) + 1
year = int(year_str.strip())
hour_str, minute_str = time_part.split(":")
hour = int(hour_str)
minute = int(minute_str)
return datetime(year, month, day, hour, minute)
except Exception:
# TODO: Properly log this
print(f"Could not parse date {date_str!r}")
return None
def _format_date_english(date_to_format: date) -> str:
month = english_months[date_to_format.month - 1]
return f"{date_to_format.day:02d}. {month} {date_to_format.year:04d}"
def _yesterday() -> date:
return date.today() - timedelta(days=1)
def _tomorrow() -> date:
return date.today() + timedelta(days=1)
def _sanitize_path_name(name: str) -> str:
return name.replace("/", "-").replace("\\", "-").strip()
def _url_set_query_param(url: str, param: str, value: str) -> str:
"""
Set a query parameter in an url, overwriting existing ones with the same name.
"""
scheme, netloc, path, query, fragment = urlsplit(url)
query_parameters = parse_qs(query)
query_parameters[param] = [value]
new_query_string = urlencode(query_parameters, doseq=True)
return urlunsplit((scheme, netloc, path, new_query_string, fragment))
def _url_set_query_params(url: str, params: Dict[str, str]) -> str:
result = url
for key, val in params.items():
result = _url_set_query_param(result, key, val)
return result
_DIRECTORY_PAGES: Set[IliasElementType] = set([ _DIRECTORY_PAGES: Set[IliasElementType] = set([
IliasElementType.EXERCISE, IliasElementType.EXERCISE,
IliasElementType.FOLDER, IliasElementType.FOLDER,
@ -559,7 +96,7 @@ class KitIliasCrawler(HttpCrawler):
async def _crawl_course(self, course_id: int) -> None: async def _crawl_course(self, course_id: int) -> None:
# Start crawling at the given course # Start crawling at the given course
root_url = _url_set_query_param( root_url = url_set_query_param(
self._base_url + "/goto.php", "target", f"crs_{course_id}" self._base_url + "/goto.php", "target", f"crs_{course_id}"
) )

View File

@ -6,7 +6,8 @@ import sys
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from contextlib import AsyncExitStack from contextlib import AsyncExitStack
from types import TracebackType from types import TracebackType
from typing import Any, Callable, Generic, Optional, Type, TypeVar from typing import Any, Callable, Dict, Generic, Optional, Type, TypeVar
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit
import bs4 import bs4
@ -38,6 +39,30 @@ def soupify(data: bytes) -> bs4.BeautifulSoup:
return bs4.BeautifulSoup(data, "html.parser") return bs4.BeautifulSoup(data, "html.parser")
def url_set_query_param(url: str, param: str, value: str) -> str:
"""
Set a query parameter in an url, overwriting existing ones with the same name.
"""
scheme, netloc, path, query, fragment = urlsplit(url)
query_parameters = parse_qs(query)
query_parameters[param] = [value]
new_query_string = urlencode(query_parameters, doseq=True)
return urlunsplit((scheme, netloc, path, new_query_string, fragment))
def url_set_query_params(url: str, params: Dict[str, str]) -> str:
"""
Sets multiple query parameters in an url, overwriting existing ones.
"""
result = url
for key, val in params.items():
result = url_set_query_param(result, key, val)
return result
async def prompt_yes_no(query: str, default: Optional[bool]) -> bool: async def prompt_yes_no(query: str, default: Optional[bool]) -> bool:
""" """
Asks the user a yes/no question and returns their choice. Asks the user a yes/no question and returns their choice.