pferd/PFERD/ilias/crawler.py
2020-05-12 14:41:13 +02:00

456 lines
16 KiB
Python

"""
Contains an ILIAS crawler alongside helper functions.
"""
import datetime
import json
import logging
import re
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
from urllib.parse import (parse_qs, urlencode, urljoin, urlparse, urlsplit,
urlunsplit)
import bs4
import requests
from ..errors import FatalException
from ..logging import PrettyLogger
from ..utils import soupify
from .authenticators import IliasAuthenticator
from .date_demangler import demangle_date
from .downloader import IliasDownloadInfo
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class IliasDirectoryType(Enum):
"""
The type of an ilias directory.
"""
FOLDER = "FOLDER"
VIDEO = "VIDEO"
EXERCISE = "EXERCISE"
IliasDirectoryFilter = Callable[[Path, IliasDirectoryType], bool]
class IliasCrawler:
# pylint: disable=too-few-public-methods
"""
A crawler for ILIAS.
"""
# pylint: disable=too-many-arguments
def __init__(
self,
base_url: str,
session: requests.Session,
authenticator: IliasAuthenticator,
dir_filter: IliasDirectoryFilter
):
"""
Create a new ILIAS crawler.
"""
self._base_url = base_url
self._session = session
self._authenticator = authenticator
self.dir_filter = dir_filter
def _abs_url_from_link(self, link_tag: bs4.Tag) -> str:
"""
Create an absolute url from an <a> tag.
"""
return urljoin(self._base_url, link_tag.get("href"))
@staticmethod
def _url_set_query_param(url: str, param: str, value: str) -> str:
"""
Set a query parameter in an url, overwriting existing ones with the same name.
"""
scheme, netloc, path, query, fragment = urlsplit(url)
query_parameters = parse_qs(query)
query_parameters[param] = [value]
new_query_string = urlencode(query_parameters, doseq=True)
return urlunsplit((scheme, netloc, path, new_query_string, fragment))
def crawl_course(self, course_id: str) -> List[IliasDownloadInfo]:
"""
Starts the crawl process for a course, yielding a list of elements to (potentially)
download.
Arguments:
course_id {str} -- the course id
Raises:
FatalException: if an unrecoverable error occurs or the course id is not valid
"""
# Start crawling at the given course
root_url = self._url_set_query_param(
self._base_url + "/goto.php", "target", f"crs_{course_id}"
)
if not self._is_course_id_valid(root_url, course_id):
raise FatalException(
"Invalid course id? The URL the server returned did not contain my id."
)
# And treat it as a folder
return self._crawl_folder(Path(""), root_url)
def _is_course_id_valid(self, root_url: str, course_id: str) -> bool:
response: requests.Response = self._session.get(root_url)
return course_id in response.url
def crawl_personal_desktop(self) -> List[IliasDownloadInfo]:
"""
Crawls the ILIAS personal desktop (and every subelements that can be reached from there).
Raises:
FatalException: if an unrecoverable error occurs
"""
return self._crawl_folder(Path(""), self._base_url + "?baseClass=ilPersonalDesktopGUI")
def _switch_on_crawled_type(
self,
path: Path,
link_element: bs4.Tag,
url: str
) -> List[IliasDownloadInfo]:
"""
Decides which sub crawler to use for a given top level element.
"""
parsed_url = urlparse(url)
LOGGER.debug("Parsed url: %r", parsed_url)
# file URLs contain "target=file"
if "target=file_" in parsed_url.query:
LOGGER.debug("Interpreted as file.")
return self._crawl_file(path, link_element, url)
# Skip forums
if "cmd=showThreads" in parsed_url.query:
LOGGER.debug("Skipping forum %r", url)
return []
# Everything with a ref_id can *probably* be opened to reveal nested things
# video groups, directories, exercises, etc
if "ref_id=" in parsed_url.query:
LOGGER.debug("Processing folder-like...")
return self._switch_on_folder_like(path, link_element, url)
PRETTY.warning(
"Got unkwarning element type in switch. I am not sure what horror I found on the"
f" ILIAS page. The element was at {str(path)!r} and it is {link_element!r})"
)
return []
@staticmethod
def _crawl_file(path: Path, link_element: bs4.Tag, url: str) -> List[IliasDownloadInfo]:
"""
Crawls a file.
"""
# Files have a list of properties (type, modification date, size, etc.)
# In a series of divs.
# Find the parent containing all those divs, so we can filter our what we need
properties_parent: bs4.Tag = link_element.findParent(
"div", {"class": lambda x: "il_ContainerListItem" in x}
).select_one(".il_ItemProperties")
# The first one is always the filetype
file_type = properties_parent.select_one("span.il_ItemProperty").getText().strip()
# The rest does not have a stable order. Grab the whole text and reg-ex the date
# out of it
all_properties_text = properties_parent.getText().strip()
modification_date_match = re.search(
r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)",
all_properties_text
)
if modification_date_match is None:
modification_date = None
PRETTY.warning(f"Could not extract start date from {all_properties_text!r}")
else:
modification_date_str = modification_date_match.group(1)
modification_date = demangle_date(modification_date_str)
# Grab the name from the link text
name = link_element.getText()
full_path = Path(path, name + "." + file_type)
return [IliasDownloadInfo(full_path, url, modification_date)]
def _switch_on_folder_like(
self,
parent_path: Path,
link_element: bs4.Tag,
url: str
) -> List[IliasDownloadInfo]:
"""
Try crawling something that looks like a folder.
"""
# pylint: disable=too-many-return-statements
element_path = Path(parent_path, link_element.getText().strip())
found_parent: Optional[bs4.Tag] = None
# We look for the outer div of our inner link, to find information around it
# (mostly the icon)
for parent in link_element.parents:
if "ilContainerListItemOuter" in parent["class"]:
found_parent = parent
break
if found_parent is None:
PRETTY.warning(f"Could not find element icon for {url!r}")
return []
# Find the small descriptive icon to figure out the type
img_tag: Optional[bs4.Tag] = found_parent.select_one("img.ilListItemIcon")
if img_tag is None:
PRETTY.warning(f"Could not find image tag for {url!r}")
return []
directory_type = IliasDirectoryType.FOLDER
if "opencast" in str(img_tag["alt"]).lower():
directory_type = IliasDirectoryType.VIDEO
if str(img_tag["src"]).endswith("icon_exc.svg"):
directory_type = IliasDirectoryType.EXERCISE
if not self.dir_filter(element_path, directory_type):
PRETTY.not_searching(element_path, "user filter")
return []
PRETTY.searching(element_path)
# A forum
if str(img_tag["src"]).endswith("frm.svg"):
LOGGER.debug("Skipping forum at %r", url)
PRETTY.not_searching(element_path, "forum")
return []
# An exercise
if directory_type == IliasDirectoryType.EXERCISE:
LOGGER.debug("Crawling exercises at %r", url)
return self._crawl_exercises(element_path, url)
if str(img_tag["src"]).endswith("icon_webr.svg"):
LOGGER.debug("Skipping external link at %r", url)
PRETTY.not_searching(element_path, "external link")
return []
# Match the opencast video plugin
if directory_type == IliasDirectoryType.VIDEO:
LOGGER.debug("Found video site: %r", url)
return self._crawl_video_directory(element_path, url)
# Assume it is a folder
return self._crawl_folder(element_path, self._abs_url_from_link(link_element))
def _crawl_video_directory(self, video_dir_path: Path, url: str) -> List[IliasDownloadInfo]:
"""
Crawl the video overview site.
"""
initial_soup = self._get_page(url, {})
# The page is actually emtpy but contains a much needed token in the link below.
# That token can be used to fetch the *actual* video listing
content_link: bs4.Tag = initial_soup.select_one("#tab_series a")
# Fetch the actual video listing. The given parameters return all videos (max 800)
# in a standalone html page
video_list_soup = self._get_page(
self._abs_url_from_link(content_link),
{"limit": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
)
direct_download_links: List[bs4.Tag] = video_list_soup.findAll(
name="a", text=re.compile(r"\s*Download\s*")
)
# Video start links are marked with an "Abspielen" link
video_links: List[bs4.Tag] = video_list_soup.findAll(
name="a", text=re.compile(r"\s*Abspielen\s*")
)
results: List[IliasDownloadInfo] = []
# We can download everything directly!
if len(direct_download_links) == len(video_links):
for link in direct_download_links:
results += self._crawl_single_video(video_dir_path, link, True)
else:
for link in video_links:
results += self._crawl_single_video(video_dir_path, link, False)
return results
def _crawl_single_video(
self,
parent_path: Path,
link: bs4.Tag,
direct_download: bool
) -> List[IliasDownloadInfo]:
"""
Crawl a single video based on its "Abspielen" link from the video listing.
"""
# The link is part of a table with multiple columns, describing metadata.
# 6th child (1 indexed) is the modification time string
modification_string = link.parent.parent.parent.select_one(
"td.std:nth-child(6)"
).getText().strip()
modification_time = datetime.datetime.strptime(modification_string, "%d.%m.%Y - %H:%M")
title = link.parent.parent.parent.select_one(
"td.std:nth-child(3)"
).getText().strip()
title += ".mp4"
video_path: Path = Path(parent_path, title)
# The video had a direct download button we can use instead
if direct_download:
LOGGER.debug("Using direct download for video %r", str(video_path))
return [IliasDownloadInfo(
video_path,
self._abs_url_from_link(link),
modification_time
)]
# Fetch the actual video page. This is a small wrapper page initializing a javscript
# player. Sadly we can not execute that JS. The actual video stream url is nowhere
# on the page, but defined in a JS object inside a script tag, passed to the player
# library.
# We do the impossible and RegEx the stream JSON object out of the page's HTML source
video_page_url = self._abs_url_from_link(link)
video_page_soup = self._get_page(video_page_url, {})
regex: re.Pattern = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
)
json_match = regex.search(str(video_page_soup))
if json_match is None:
PRETTY.warning(f"Could not find json stream info for {video_page_url!r}")
return []
json_str = json_match.group(1)
# parse it
json_object = json.loads(json_str)
# and fetch the video url!
video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"]
return [IliasDownloadInfo(video_path, video_url, modification_time)]
def _crawl_exercises(self, element_path: Path, url: str) -> List[IliasDownloadInfo]:
"""
Crawl files offered for download in exercises.
"""
soup = self._get_page(url, {})
results: List[IliasDownloadInfo] = []
# Each assignment is in an accordion container
assignment_containers: List[bs4.Tag] = soup.select(".il_VAccordionInnerContainer")
for container in assignment_containers:
# Fetch the container name out of the header to use it in the path
container_name = container.select_one(".ilAssignmentHeader").getText().strip()
# Find all download links in the container (this will contain all the files)
files: List[bs4.Tag] = container.findAll(
name="a",
# download links contain the given command class
attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x},
text="Download"
)
LOGGER.debug("Found exercise container %r", container_name)
# Grab each file as you now have the link
for file_link in files:
# Two divs, side by side. Left is the name, right is the link ==> get left
# sibling
file_name = file_link.parent.findPrevious(name="div").getText().strip()
url = self._abs_url_from_link(file_link)
LOGGER.debug("Found file %r at %r", file_name, url)
results.append(IliasDownloadInfo(
Path(element_path, container_name, file_name),
url,
None # We do not have any timestamp
))
return results
def _crawl_folder(self, folder_path: Path, url: str) -> List[IliasDownloadInfo]:
"""
Crawl all files in a folder-like element.
"""
soup = self._get_page(url, {})
result: List[IliasDownloadInfo] = []
# Fetch all links and throw them to the general interpreter
links: List[bs4.Tag] = soup.select("a.il_ContainerItemTitle")
for link in links:
abs_url = self._abs_url_from_link(link)
result += self._switch_on_crawled_type(folder_path, link, abs_url)
return result
def _get_page(self, url: str, params: Dict[str, Any]) -> bs4.BeautifulSoup:
"""
Fetches a page from ILIAS, authenticating when needed.
"""
LOGGER.debug("Fetching %r", url)
response = self._session.get(url, params=params)
content_type = response.headers["content-type"]
if not content_type.startswith("text/html"):
# TODO: Correct exception type
raise Exception(f"Invalid content type {content_type}")
soup = soupify(response)
if self._is_logged_in(soup):
return soup
LOGGER.info("Not authenticated, changing that...")
self._authenticator.authenticate(self._session)
return self._get_page(url, params)
@staticmethod
def _is_logged_in(soup: bs4.BeautifulSoup) -> bool:
# Normal ILIAS pages
userlog = soup.find("li", {"id": "userlog"})
if userlog is not None:
LOGGER.debug("Auth: Found #userlog")
return True
# Video listing embeds do not have complete ILIAS html. Try to match them by
# their video listing table
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
LOGGER.debug("Auth: Found #tbl_xoct.+")
return True
# The individual video player wrapper page has nothing of the above.
# Match it by its playerContainer.
if soup.select_one("#playerContainer") is not None:
LOGGER.debug("Auth: Found #playerContainer")
return True
return False