mirror of
https://github.com/Garmelon/PFERD.git
synced 2023-12-21 10:23:01 +01:00
Restructure crawling and auth related modules
This commit is contained in:
20
PFERD/auth/__init__.py
Normal file
20
PFERD/auth/__init__.py
Normal file
@ -0,0 +1,20 @@
|
||||
from configparser import SectionProxy
|
||||
from typing import Callable, Dict
|
||||
|
||||
from ..config import Config
|
||||
from .authenticator import Authenticator, AuthSection
|
||||
from .simple import SimpleAuthenticator, SimpleAuthSection
|
||||
from .tfa import TfaAuthenticator
|
||||
|
||||
AuthConstructor = Callable[[
|
||||
str, # Name (without the "auth:" prefix)
|
||||
SectionProxy, # Authenticator's section of global config
|
||||
Config, # Global config
|
||||
], Authenticator]
|
||||
|
||||
AUTHENTICATORS: Dict[str, AuthConstructor] = {
|
||||
"simple": lambda n, s, c:
|
||||
SimpleAuthenticator(n, SimpleAuthSection(s), c),
|
||||
"tfa": lambda n, s, c:
|
||||
TfaAuthenticator(n, AuthSection(s), c),
|
||||
}
|
81
PFERD/auth/authenticator.py
Normal file
81
PFERD/auth/authenticator.py
Normal file
@ -0,0 +1,81 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Tuple
|
||||
|
||||
from ..config import Config, Section
|
||||
|
||||
|
||||
class AuthLoadException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AuthException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AuthSection(Section):
|
||||
pass
|
||||
|
||||
|
||||
class Authenticator(ABC):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
section: AuthSection,
|
||||
config: Config,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize an authenticator from its name and its section in the config
|
||||
file.
|
||||
|
||||
If you are writing your own constructor for your own authenticator,
|
||||
make sure to call this constructor first (via super().__init__).
|
||||
|
||||
May throw an AuthLoadException.
|
||||
"""
|
||||
|
||||
self.name = name
|
||||
|
||||
@abstractmethod
|
||||
async def credentials(self) -> Tuple[str, str]:
|
||||
pass
|
||||
|
||||
async def username(self) -> str:
|
||||
username, _ = await self.credentials()
|
||||
return username
|
||||
|
||||
async def password(self) -> str:
|
||||
_, password = await self.credentials()
|
||||
return password
|
||||
|
||||
def invalidate_credentials(self) -> None:
|
||||
"""
|
||||
Tell the authenticator that some or all of its credentials are invalid.
|
||||
|
||||
Authenticators should overwrite this function if they have a way to
|
||||
deal with this issue that is likely to result in valid credentials
|
||||
(e. g. prompting the user).
|
||||
"""
|
||||
|
||||
raise AuthException("Invalid credentials")
|
||||
|
||||
def invalidate_username(self) -> None:
|
||||
"""
|
||||
Tell the authenticator that specifically its username is invalid.
|
||||
|
||||
Authenticators should overwrite this function if they have a way to
|
||||
deal with this issue that is likely to result in valid credentials
|
||||
(e. g. prompting the user).
|
||||
"""
|
||||
|
||||
raise AuthException("Invalid username")
|
||||
|
||||
def invalidate_password(self) -> None:
|
||||
"""
|
||||
Tell the authenticator that specifically its password is invalid.
|
||||
|
||||
Authenticators should overwrite this function if they have a way to
|
||||
deal with this issue that is likely to result in valid credentials
|
||||
(e. g. prompting the user).
|
||||
"""
|
||||
|
||||
raise AuthException("Invalid password")
|
68
PFERD/auth/simple.py
Normal file
68
PFERD/auth/simple.py
Normal file
@ -0,0 +1,68 @@
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from ..config import Config
|
||||
from ..logging import log
|
||||
from ..utils import agetpass, ainput
|
||||
from .authenticator import Authenticator, AuthException, AuthSection
|
||||
|
||||
|
||||
class SimpleAuthSection(AuthSection):
|
||||
def username(self) -> Optional[str]:
|
||||
return self.s.get("username")
|
||||
|
||||
def password(self) -> Optional[str]:
|
||||
return self.s.get("password")
|
||||
|
||||
|
||||
class SimpleAuthenticator(Authenticator):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
section: SimpleAuthSection,
|
||||
config: Config,
|
||||
) -> None:
|
||||
super().__init__(name, section, config)
|
||||
|
||||
self._username = section.username()
|
||||
self._password = section.password()
|
||||
|
||||
self._username_fixed = self.username is not None
|
||||
self._password_fixed = self.password is not None
|
||||
|
||||
async def credentials(self) -> Tuple[str, str]:
|
||||
if self._username is not None and self._password is not None:
|
||||
return self._username, self._password
|
||||
|
||||
async with log.exclusive_output():
|
||||
if self._username is None:
|
||||
self._username = await ainput("Username: ")
|
||||
else:
|
||||
print(f"Username: {self._username}")
|
||||
|
||||
if self._password is None:
|
||||
self._password = await agetpass("Password: ")
|
||||
|
||||
# Intentionally returned inside the context manager so we know
|
||||
# they're both not None
|
||||
return self._username, self._password
|
||||
|
||||
def invalidate_credentials(self) -> None:
|
||||
if self._username_fixed and self._password_fixed:
|
||||
raise AuthException("Configured credentials are invalid")
|
||||
|
||||
if not self._username_fixed:
|
||||
self._username = None
|
||||
if not self._password_fixed:
|
||||
self._password = None
|
||||
|
||||
def invalidate_username(self) -> None:
|
||||
if self._username_fixed:
|
||||
raise AuthException("Configured username is invalid")
|
||||
else:
|
||||
self._username = None
|
||||
|
||||
def invalidate_password(self) -> None:
|
||||
if self._password_fixed:
|
||||
raise AuthException("Configured password is invalid")
|
||||
else:
|
||||
self._password = None
|
36
PFERD/auth/tfa.py
Normal file
36
PFERD/auth/tfa.py
Normal file
@ -0,0 +1,36 @@
|
||||
from typing import Tuple
|
||||
|
||||
from ..config import Config
|
||||
from ..logging import log
|
||||
from ..utils import ainput
|
||||
from .authenticator import Authenticator, AuthException, AuthSection
|
||||
|
||||
|
||||
class TfaAuthenticator(Authenticator):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
section: AuthSection,
|
||||
config: Config,
|
||||
) -> None:
|
||||
super().__init__(name, section, config)
|
||||
|
||||
async def username(self) -> str:
|
||||
raise AuthException("TFA authenticator does not support usernames")
|
||||
|
||||
async def password(self) -> str:
|
||||
async with log.exclusive_output():
|
||||
code = await ainput("TFA code: ")
|
||||
return code
|
||||
|
||||
async def credentials(self) -> Tuple[str, str]:
|
||||
raise AuthException("TFA authenticator does not support usernames")
|
||||
|
||||
def invalidate_username(self) -> None:
|
||||
raise AuthException("TFA authenticator does not support usernames")
|
||||
|
||||
def invalidate_password(self) -> None:
|
||||
pass
|
||||
|
||||
def invalidate_credentials(self) -> None:
|
||||
pass
|
Reference in New Issue
Block a user