mirror of
https://github.com/Garmelon/PFERD.git
synced 2025-07-12 22:22:30 +02:00
feat: Add authentication to generic ilias dl
This commit is contained in:
@ -24,10 +24,34 @@ from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningMo
|
|||||||
IliasPageElement, _sanitize_path_name, parse_ilias_forum_export)
|
IliasPageElement, _sanitize_path_name, parse_ilias_forum_export)
|
||||||
|
|
||||||
TargetType = Union[str, int]
|
TargetType = Union[str, int]
|
||||||
_ILIAS_URL = "https://ilias.studium.kit.edu"
|
|
||||||
|
|
||||||
|
class IliasConfig():
|
||||||
|
def __init__(self, base_url: str, client_id: str):
|
||||||
|
self._base_url = base_url
|
||||||
|
self._client_id = client_id
|
||||||
|
|
||||||
|
@property
|
||||||
|
def base_url(self) -> str:
|
||||||
|
return self._base_url
|
||||||
|
|
||||||
|
@property
|
||||||
|
def client_id(self) -> str:
|
||||||
|
return self._client_id
|
||||||
|
|
||||||
|
|
||||||
class IliasWebCrawlerSection(HttpCrawlerSection):
|
class IliasWebCrawlerSection(HttpCrawlerSection):
|
||||||
|
def conf(self) -> IliasConfig:
|
||||||
|
base_url = self.s.get("base_url")
|
||||||
|
if not base_url:
|
||||||
|
self.missing_value("base_url")
|
||||||
|
|
||||||
|
client_id = self.s.get("client_id")
|
||||||
|
if not client_id:
|
||||||
|
self.missing_value("client_id")
|
||||||
|
|
||||||
|
return IliasConfig(base_url, client_id)
|
||||||
|
|
||||||
def target(self) -> TargetType:
|
def target(self) -> TargetType:
|
||||||
target = self.s.get("target")
|
target = self.s.get("target")
|
||||||
if not target:
|
if not target:
|
||||||
@ -39,7 +63,7 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
|
|||||||
if target == "desktop":
|
if target == "desktop":
|
||||||
# Full personal desktop
|
# Full personal desktop
|
||||||
return target
|
return target
|
||||||
if target.startswith(_ILIAS_URL):
|
if target.startswith(self.conf().base_url):
|
||||||
# ILIAS URL
|
# ILIAS URL
|
||||||
return target
|
return target
|
||||||
|
|
||||||
@ -140,7 +164,7 @@ instance's greatest bottleneck.
|
|||||||
""".strip())
|
""".strip())
|
||||||
|
|
||||||
self._auth = auth
|
self._auth = auth
|
||||||
self._base_url = _ILIAS_URL
|
self._conf = section.conf()
|
||||||
|
|
||||||
self._target = section.target()
|
self._target = section.target()
|
||||||
self._link_file_redirect_delay = section.link_redirect_delay()
|
self._link_file_redirect_delay = section.link_redirect_delay()
|
||||||
@ -163,7 +187,7 @@ instance's greatest bottleneck.
|
|||||||
async def _crawl_course(self, course_id: int) -> None:
|
async def _crawl_course(self, course_id: int) -> None:
|
||||||
# Start crawling at the given course
|
# Start crawling at the given course
|
||||||
root_url = url_set_query_param(
|
root_url = url_set_query_param(
|
||||||
self._base_url + "/goto.php", "target", f"crs_{course_id}"
|
self._conf.base_url + "/goto.php", "target", f"crs_{course_id}"
|
||||||
)
|
)
|
||||||
|
|
||||||
await self._crawl_url(root_url, expected_id=course_id)
|
await self._crawl_url(root_url, expected_id=course_id)
|
||||||
@ -171,7 +195,7 @@ instance's greatest bottleneck.
|
|||||||
async def _crawl_desktop(self) -> None:
|
async def _crawl_desktop(self) -> None:
|
||||||
appendix = r"ILIAS\Repository\Provider\RepositoryMainBarProvider|mm_pd_sel_items"
|
appendix = r"ILIAS\Repository\Provider\RepositoryMainBarProvider|mm_pd_sel_items"
|
||||||
appendix = appendix.encode("ASCII").hex()
|
appendix = appendix.encode("ASCII").hex()
|
||||||
await self._crawl_url(self._base_url + "/gs_content.php?item=" + appendix)
|
await self._crawl_url(self._conf.base_url + "/gs_content.php?item=" + appendix)
|
||||||
|
|
||||||
async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None:
|
async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None:
|
||||||
maybe_cl = await self.crawl(PurePath("."))
|
maybe_cl = await self.crawl(PurePath("."))
|
||||||
@ -844,8 +868,8 @@ instance's greatest bottleneck.
|
|||||||
continue
|
continue
|
||||||
if elem.name == "img":
|
if elem.name == "img":
|
||||||
if src := elem.attrs.get("src", None):
|
if src := elem.attrs.get("src", None):
|
||||||
url = urljoin(_ILIAS_URL, src)
|
url = urljoin(self._conf.base_url, src)
|
||||||
if not url.startswith(_ILIAS_URL):
|
if not url.startswith(self._conf.base_url):
|
||||||
continue
|
continue
|
||||||
log.explain(f"Internalizing {url!r}")
|
log.explain(f"Internalizing {url!r}")
|
||||||
img = await self._get_authenticated(url)
|
img = await self._get_authenticated(url)
|
||||||
@ -925,7 +949,63 @@ instance's greatest bottleneck.
|
|||||||
return await request.read()
|
return await request.read()
|
||||||
raise CrawlError("get_authenticated failed even after authenticating")
|
raise CrawlError("get_authenticated failed even after authenticating")
|
||||||
|
|
||||||
# ToDo: Is this still required?
|
# ToDo: Is iorepeat still required?
|
||||||
@_iorepeat(3, "Login", failure_is_error=True)
|
@_iorepeat(3, "Login", failure_is_error=True)
|
||||||
async def _authenticate(self) -> None:
|
async def _authenticate(self) -> None:
|
||||||
pass
|
# fill the session with the correct cookies
|
||||||
|
params = {
|
||||||
|
"client_id": self._conf.client_id,
|
||||||
|
"cmd": "force_login",
|
||||||
|
}
|
||||||
|
async with self.session.get(f"{self._conf.base_url}/login.php", params=params) as request:
|
||||||
|
login_page = soupify(await request.read())
|
||||||
|
|
||||||
|
login_form = login_page.find("form", attrs={"name": "formlogin"})
|
||||||
|
if login_form is None:
|
||||||
|
raise CrawlError("Could not find the login form! Specified client id might be invalid.")
|
||||||
|
|
||||||
|
login_url = login_form.attrs.get("action")
|
||||||
|
if login_url is None:
|
||||||
|
raise CrawlError("Could not find the action URL in the login form!")
|
||||||
|
|
||||||
|
username, password = await self._auth.credentials()
|
||||||
|
|
||||||
|
login_data = {
|
||||||
|
"username": username,
|
||||||
|
"password": password,
|
||||||
|
"cmd[doStandardAuthentication]": "Login",
|
||||||
|
}
|
||||||
|
|
||||||
|
# do the actual login
|
||||||
|
async with self.session.post(f"{self._conf.base_url}/{login_url}", data=login_data) as request:
|
||||||
|
soup = soupify(await request.read())
|
||||||
|
if not self._is_logged_in(soup):
|
||||||
|
self._auth.invalidate_credentials()
|
||||||
|
|
||||||
|
@ staticmethod
|
||||||
|
def _is_logged_in(soup: BeautifulSoup) -> bool:
|
||||||
|
# Normal ILIAS pages
|
||||||
|
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
|
||||||
|
if mainbar is not None:
|
||||||
|
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x})
|
||||||
|
shib_login = soup.find(id="button_shib_login")
|
||||||
|
return not login_button and not shib_login
|
||||||
|
|
||||||
|
# Personal Desktop
|
||||||
|
if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Video listing embeds do not have complete ILIAS html. Try to match them by
|
||||||
|
# their video listing table
|
||||||
|
video_table = soup.find(
|
||||||
|
recursive=True,
|
||||||
|
name="table",
|
||||||
|
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")}
|
||||||
|
)
|
||||||
|
if video_table is not None:
|
||||||
|
return True
|
||||||
|
# The individual video player wrapper page has nothing of the above.
|
||||||
|
# Match it by its playerContainer.
|
||||||
|
if soup.select_one("#playerContainer") is not None:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
Reference in New Issue
Block a user