Add support for non-KIT shibboleth login (#98)

Co-authored-by: Mr-Pine <git@mr-pine.de>
Co-authored-by: I-Al-Istannen <I-Al-Istannen@users.noreply.github.com>
This commit is contained in:
PinieP
2024-11-05 18:30:34 +01:00
committed by GitHub
parent 5983200247
commit 596b6a7688
8 changed files with 226 additions and 244 deletions

View File

@ -23,10 +23,16 @@ from .file_templates import Links, learning_module_template
from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
IliasPageElement, _sanitize_path_name, parse_ilias_forum_export)
from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int]
class LoginTypeLocal:
def __init__(self, client_id: str):
self.client_id = client_id
class IliasWebCrawlerSection(HttpCrawlerSection):
def base_url(self) -> str:
base_url = self.s.get("base_url")
@ -35,12 +41,30 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
return base_url
def client_id(self) -> str:
client_id = self.s.get("client_id")
if not client_id:
self.missing_value("client_id")
def login(self) -> Union[Literal["shibboleth"], LoginTypeLocal]:
login_type = self.s.get("login_type")
if not login_type:
self.missing_value("login_type")
if login_type == "shibboleth":
return "shibboleth"
if login_type == "local":
client_id = self.s.get("client_id")
if not client_id:
self.missing_value("client_id")
return LoginTypeLocal(client_id)
return client_id
self.invalid_value("login_type", login_type, "Should be <shibboleth | local>")
def tfa_auth(
self, authenticators: Dict[str, Authenticator]
) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
auth = authenticators.get(value)
if auth is None:
self.invalid_value("tfa_auth", value, "No such auth section exists")
return auth
def target(self) -> TargetType:
target = self.s.get("target")
@ -156,7 +180,13 @@ instance's greatest bottleneck.
self._auth = auth
self._base_url = section.base_url()
self._client_id = section.client_id()
self._tfa_auth = section.tfa_auth(authenticators)
self._login_type = section.login()
if isinstance(self._login_type, LoginTypeLocal):
self._client_id = self._login_type.client_id
else:
self._shibboleth_login = ShibbolethLogin(self._base_url, self._auth, self._tfa_auth)
self._target = section.target()
self._link_file_redirect_delay = section.link_redirect_delay()
@ -179,7 +209,7 @@ instance's greatest bottleneck.
async def _crawl_course(self, course_id: int) -> None:
# Start crawling at the given course
root_url = url_set_query_param(
urljoin(self._base_url, "/goto.php"),
urljoin(self._base_url + "/", "goto.php"),
"target", f"crs_{course_id}",
)
@ -460,11 +490,12 @@ instance's greatest bottleneck.
return ""
return None
auth_id = await self._current_auth_id()
target = await impl()
if target is not None:
return target
await self._authenticate()
await self.authenticate(auth_id)
target = await impl()
if target is not None:
@ -935,38 +966,39 @@ instance's greatest bottleneck.
return await request.read()
raise CrawlError("get_authenticated failed even after authenticating")
# ToDo: Is iorepeat still required?
@_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None:
# fill the session with the correct cookies
params = {
"client_id": self._client_id,
"cmd": "force_login",
}
async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request:
login_page = soupify(await request.read())
if self._login_type == "shibboleth":
await self._shibboleth_login.login(self.session)
else:
params = {
"client_id": self._client_id,
"cmd": "force_login",
}
async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request:
login_page = soupify(await request.read())
login_form = login_page.find("form", attrs={"name": "formlogin"})
if login_form is None:
raise CrawlError("Could not find the login form! Specified client id might be invalid.")
login_form = login_page.find("form", attrs={"name": "formlogin"})
if login_form is None:
raise CrawlError("Could not find the login form! Specified client id might be invalid.")
login_url = login_form.attrs.get("action")
if login_url is None:
raise CrawlError("Could not find the action URL in the login form!")
login_url = login_form.attrs.get("action")
if login_url is None:
raise CrawlError("Could not find the action URL in the login form!")
username, password = await self._auth.credentials()
username, password = await self._auth.credentials()
login_data = {
"username": username,
"password": password,
"cmd[doStandardAuthentication]": "Login",
}
login_data = {
"username": username,
"password": password,
"cmd[doStandardAuthentication]": "Login",
}
# do the actual login
async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request:
soup = soupify(await request.read())
if not self._is_logged_in(soup):
self._auth.invalidate_credentials()
# do the actual login
async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request:
soup = soupify(await request.read())
if not self._is_logged_in(soup):
self._auth.invalidate_credentials()
@staticmethod
def _is_logged_in(soup: BeautifulSoup) -> bool: