pferd/PFERD/ilias/crawler.py

319 lines
11 KiB
Python
Raw Normal View History

2020-04-21 13:32:03 +02:00
"""
Contains an ILIAS crawler alongside helper functions.
"""
import datetime
import json
2020-04-21 13:32:03 +02:00
import logging
import re
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import (parse_qs, urlencode, urljoin, urlparse, urlsplit,
urlunsplit)
import bs4
from ..cookie_jar import CookieJar
from ..utils import soupify
from .authenticators import IliasAuthenticator
2020-04-22 12:44:41 +02:00
from .date_demangler import demangle_date
2020-04-21 13:32:03 +02:00
from .downloader import IliasDownloadInfo
LOGGER = logging.getLogger(__name__)
class IliasCrawler:
# pylint: disable=too-few-public-methods
"""
A crawler for ILIAS.
"""
def __init__(self, authenticator: IliasAuthenticator, base_url: str, course_id: str):
"""
Create a new ILIAS crawler.
"""
self._cookie_jar = CookieJar(Path("/tmp/test/cookies"))
self._cookie_jar.load_cookies()
self._base_url = base_url
self._course_id = course_id
self._session = self._cookie_jar.create_session()
self._authenticator = authenticator
def _abs_url_from_link(self, link_tag: bs4.Tag) -> str:
"""
Create an absolute url from an <a> tag.
"""
return urljoin(self._base_url, link_tag.get("href"))
@staticmethod
def _url_set_query_param(url: str, param: str, value: str) -> str:
scheme, netloc, path, query, fragment = urlsplit(url)
query_parameters = parse_qs(query)
query_parameters[param] = [value]
new_query_string = urlencode(query_parameters, doseq=True)
return urlunsplit((scheme, netloc, path, new_query_string, fragment))
def crawl(self) -> List[IliasDownloadInfo]:
"""
Starts the crawl process, yielding a list of elements to (potentially) download.
"""
root_url = self._url_set_query_param(
self._base_url + "/goto.php", "target", f"crs_{self._course_id}"
)
return self._crawl_folder(Path(""), root_url)
def _switch_on_crawled_type(
self,
path: Path,
link_element: bs4.Tag,
url: str
) -> List[IliasDownloadInfo]:
"""
Decides which sub crawler to use for a given top level element.
"""
parsed_url = urlparse(url)
LOGGER.debug("Parsed url: %r", parsed_url)
2020-04-21 13:32:03 +02:00
if "target=file_" in parsed_url.query:
LOGGER.debug("Interpreted as file.")
2020-04-21 13:32:03 +02:00
return self._crawl_file(path, link_element, url)
# Skip forums
if "cmd=showThreads" in parsed_url.query:
LOGGER.debug("Skipping forum %r", url)
2020-04-21 13:32:03 +02:00
return []
if "ref_id=" in parsed_url.query:
LOGGER.debug("Processing folder-like...")
return self._switch_on_folder_like(path, link_element, url)
LOGGER.warning("Got unknown type, %r, %r, %r", path, link_element, url)
2020-04-21 13:32:03 +02:00
# TODO: Other types
raise Exception("Implement me!")
@staticmethod
def _crawl_file(path: Path, link_element: bs4.Tag, url: str) -> List[IliasDownloadInfo]:
"""
Crawls a file.
"""
properties_parent: bs4.Tag = link_element.findParent(
"div", {"class": lambda x: "il_ContainerListItem" in x}
).select_one(".il_ItemProperties")
file_type = properties_parent.select_one("span.il_ItemProperty").getText().strip()
all_properties_text = properties_parent.getText().strip()
2020-04-22 12:44:41 +02:00
modification_date_match = re.search(
r"(((\d+\. \w+ \d+)|(Gestern)|(Heute)), \d+:\d+)",
all_properties_text
)
if modification_date_match is None:
modification_date = datetime.datetime.now()
LOGGER.warning("Could not extract start date from %r", all_properties_text)
else:
modification_date_str = modification_date_match.group(1)
modification_date = demangle_date(modification_date_str)
2020-04-21 13:32:03 +02:00
name = link_element.getText()
full_path = Path(path, name + "." + file_type)
match_result = re.match(r".+target=file_(\d+).+", url)
if match_result is None:
LOGGER.warning("Could not download file %r", url)
2020-04-21 13:32:03 +02:00
return []
2020-04-22 12:44:41 +02:00
return [IliasDownloadInfo(full_path, url, modification_date)]
2020-04-21 13:32:03 +02:00
def _switch_on_folder_like(
self,
path: Path,
link_element: bs4.Tag,
url: str
) -> List[IliasDownloadInfo]:
found_parent: Optional[bs4.Tag] = None
for parent in link_element.parents:
if "ilContainerListItemOuter" in parent["class"]:
found_parent = parent
break
if found_parent is None:
LOGGER.warning("Could not find element icon for %r", url)
2020-04-21 13:32:03 +02:00
return []
img_tag: Optional[bs4.Tag] = found_parent.select_one("img.ilListItemIcon")
if img_tag is None:
LOGGER.warning("Could not find image tag for %r", url)
2020-04-21 13:32:03 +02:00
return []
if str(img_tag["src"]).endswith("frm.svg"):
LOGGER.debug("Skipping forum at %r", url)
2020-04-21 13:32:03 +02:00
return []
element_path = Path(path, link_element.getText().strip())
if str(img_tag["src"]).endswith("icon_exc.svg"):
LOGGER.debug("Crawling exercises at %r", url)
return self._crawl_exercises(element_path, url)
if "opencast" in str(img_tag["alt"]).lower():
LOGGER.debug("Found video site: %r", url)
return self._crawl_video_directory(element_path, url)
2020-04-21 13:32:03 +02:00
# Assume it is a folder
return self._crawl_folder(element_path, self._abs_url_from_link(link_element))
2020-04-21 13:32:03 +02:00
2020-04-22 01:37:34 +02:00
def _crawl_video_directory(self, path: Path, url: str) -> List[IliasDownloadInfo]:
initial_soup = self._get_page(url, {})
content_link: bs4.Tag = initial_soup.select_one("#tab_series a")
video_list_soup = self._get_page(
self._abs_url_from_link(content_link),
{"limit": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
)
video_links: List[bs4.Tag] = video_list_soup.findAll(
name="a", text=re.compile(r"\s*Abspielen\s*")
)
results: List[IliasDownloadInfo] = []
for link in video_links:
2020-04-22 01:37:34 +02:00
results += self._crawl_single_video(path, link)
2020-04-22 01:37:34 +02:00
return results
2020-04-22 01:37:34 +02:00
def _crawl_single_video(self, path: Path, link: bs4.Tag) -> List[IliasDownloadInfo]:
video_page_url = self._abs_url_from_link(link)
2020-04-22 01:37:34 +02:00
modification_string = link.parent.parent.parent.select_one(
"td.std:nth-child(6)"
).getText().strip()
modification_time = datetime.datetime.strptime(modification_string, "%d.%m.%Y - %H:%M")
2020-04-22 01:37:34 +02:00
title = link.parent.parent.parent.select_one(
"td.std:nth-child(3)"
).getText().strip()
2020-04-22 12:44:41 +02:00
title += ".mp4"
2020-04-22 01:37:34 +02:00
video_page_soup = self._get_page(video_page_url, {})
regex: re.Pattern = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
)
json_match = regex.search(str(video_page_soup))
2020-04-22 01:37:34 +02:00
if json_match is None:
LOGGER.warning("Could not find json stream info for %r", video_page_url)
return []
json_str = json_match.group(1)
2020-04-22 01:37:34 +02:00
json_object = json.loads(json_str)
video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"]
return [IliasDownloadInfo(Path(path, title), video_url, modification_time)]
def _crawl_exercises(self, element_path: Path, url: str) -> List[IliasDownloadInfo]:
soup = self._get_page(url, {})
results: List[IliasDownloadInfo] = []
assignment_containers: List[bs4.Tag] = soup.select(".il_VAccordionInnerContainer")
for container in assignment_containers:
container_name = container.select_one(".ilAssignmentHeader").getText().strip()
files: List[bs4.Tag] = container.findAll(
name="a",
attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x},
text="Download"
)
LOGGER.debug("Found exercise container %r", container_name)
end_date: datetime.datetime = datetime.datetime.now()
end_date_header: bs4.Tag = container.find(name="div", text="Abgabetermin")
if end_date_header is not None:
end_date_text = end_date_header.findNext("div").getText().strip()
end_date = demangle_date(end_date_text)
for file_link in files:
file_name = file_link.parent.findPrevious(name="div").getText().strip()
url = self._abs_url_from_link(file_link)
LOGGER.debug("Found file %r at %r", file_name, url)
results.append(IliasDownloadInfo(
Path(element_path, container_name, file_name),
url,
end_date
))
return results
2020-04-21 13:32:03 +02:00
def _crawl_folder(self, path: Path, url: str) -> List[IliasDownloadInfo]:
soup = self._get_page(url, {})
result: List[IliasDownloadInfo] = []
links: List[bs4.Tag] = soup.select("a.il_ContainerItemTitle")
for link in links:
abs_url = self._abs_url_from_link(link)
result += self._switch_on_crawled_type(path, link, abs_url)
return result
def _get_page(self, url: str, params: Dict[str, Any]) -> bs4.BeautifulSoup:
"""
Fetches a page from ILIAS, authenticating when needed.
"""
LOGGER.debug("Fetching %r", url)
2020-04-21 13:32:03 +02:00
response = self._session.get(url, params=params)
content_type = response.headers["content-type"]
if not content_type.startswith("text/html"):
# TODO: Correct exception type
raise Exception(f"Invalid content type {content_type}")
soup = soupify(response)
if self._is_logged_in(soup):
return soup
LOGGER.info("Not authenticated, changing that...")
self._authenticator.authenticate(self._session)
self._cookie_jar.save_cookies("Authed")
return self._get_page(url, params)
@staticmethod
def _is_logged_in(soup: bs4.BeautifulSoup) -> bool:
userlog = soup.find("li", {"id": "userlog"})
if userlog is not None:
2020-04-22 12:44:41 +02:00
LOGGER.debug("Auth: Found #userlog")
return True
video_table = soup.find(
recursive=True,
name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
)
if video_table is not None:
2020-04-22 12:44:41 +02:00
LOGGER.debug("Auth: Found #tbl_xoct.+")
return True
if soup.select_one("#playerContainer") is not None:
2020-04-22 12:44:41 +02:00
LOGGER.debug("Auth: Found #playerContainer")
return True
return False
2020-04-21 13:32:03 +02:00
def run_as_test(ilias_url: str, course_id: int) -> List[IliasDownloadInfo]:
from ..organizer import Organizer
from .authenticators import KitShibbolethAuthenticator
crawler = IliasCrawler(KitShibbolethAuthenticator(), ilias_url, str(course_id))
return crawler.crawl()