Add simple authenticator

... including some required authenticator infrastructure
This commit is contained in:
Joscha 2021-05-11 00:27:43 +02:00
parent d5f29f01c5
commit 0459ed093e
5 changed files with 155 additions and 0 deletions

52
PFERD/authenticator.py Normal file
View File

@ -0,0 +1,52 @@
from abc import ABC, abstractmethod
from typing import Tuple
from .conductor import TerminalConductor
from .config import Config, Section
class AuthLoadException(Exception):
pass
class AuthException(Exception):
pass
class AuthSection(Section):
pass
class Authenticator(ABC):
def __init__(
self,
name: str,
section: AuthSection,
config: Config,
conductor: TerminalConductor,
) -> None:
"""
Initialize an authenticator from its name and its section in the config
file.
If you are writing your own constructor for your own authenticator,
make sure to call this constructor first (via super().__init__).
May throw an AuthLoadException.
"""
self.name = name
self.conductor = conductor
@abstractmethod
async def credentials(self) -> Tuple[str, str]:
pass
def invalid_credentials(self) -> None:
raise AuthException("Invalid credentials")
def invalid_username(self) -> None:
raise AuthException("Invalid username")
def invalid_password(self) -> None:
raise AuthException("Invalid password")

View File

@ -0,0 +1,19 @@
from configparser import SectionProxy
from typing import Callable, Dict
from ..authenticator import Authenticator
from ..conductor import TerminalConductor
from ..config import Config
from .simple import SimpleAuthenticator, SimpleAuthSection
AuthConstructor = Callable[[
str, # Name (without the "auth:" prefix)
SectionProxy, # Authenticator's section of global config
Config, # Global config
TerminalConductor, # Global conductor instance
], Authenticator]
AUTHENTICATORS: Dict[str, AuthConstructor] = {
"simple": lambda n, s, c, t:
SimpleAuthenticator(n, SimpleAuthSection(s), c, t),
}

View File

@ -0,0 +1,48 @@
from typing import Optional, Tuple
from ..authenticator import Authenticator, AuthSection
from ..conductor import TerminalConductor
from ..config import Config
from ..utils import agetpass, ainput
class SimpleAuthSection(AuthSection):
def username(self) -> Optional[str]:
return self.s.get("username")
def password(self) -> Optional[str]:
return self.s.get("password")
class SimpleAuthenticator(Authenticator):
def __init__(
self,
name: str,
section: SimpleAuthSection,
config: Config,
conductor: TerminalConductor,
) -> None:
super().__init__(name, section, config, conductor)
self.username = section.username()
self.password = section.password()
self.username_fixed = self.username is not None
self.password_fixed = self.password is not None
async def credentials(self) -> Tuple[str, str]:
if self.username is not None and self.password is not None:
return self.username, self.password
async with self.conductor.exclusive_output():
if self.username is None:
self.username = await ainput("Username: ")
else:
print(f"Username: {self.username}")
if self.password is None:
self.password = await agetpass("Password: ")
else:
print("Password: *******")
return self.username, self.password

View File

@ -138,6 +138,15 @@ class Config:
return result
def authenticator_sections(self) -> List[Tuple[str, SectionProxy]]:
result = []
for section_name, section_proxy in self._parser.items():
if section_name.startswith("auth:"):
crawler_name = section_name[5:]
result.append((crawler_name, section_proxy))
return result
@property
def working_dir(self) -> Path:
pathstr = self.default_section.get("working_dir", ".")

View File

@ -3,6 +3,8 @@ from typing import Dict
from rich import print
from rich.markup import escape
from .authenticator import Authenticator
from .authenticators import AUTHENTICATORS
from .conductor import TerminalConductor
from .config import Config
from .crawler import Crawler
@ -17,8 +19,32 @@ class Pferd:
def __init__(self, config: Config):
self._config = config
self._conductor = TerminalConductor()
self._authenticators: Dict[str, Authenticator] = {}
self._crawlers: Dict[str, Crawler] = {}
def _load_authenticators(self) -> None:
abort = False
for name, section in self._config.authenticator_sections():
print(f"[bold bright_cyan]Loading[/] auth:{escape(name)}")
authenticator_type = section.get("type")
authenticator_constructor = AUTHENTICATORS.get(authenticator_type)
if authenticator_constructor is None:
abort = True
t = escape(repr(authenticator_type))
print(f"[red]Error: Unknown authenticator type {t}")
continue
authenticator = authenticator_constructor(
name,
section,
self._config,
self._conductor,
)
self._authenticators[name] = authenticator
if abort:
raise PferdLoadException()
def _load_crawlers(self) -> None:
abort = False
for name, section in self._config.crawler_sections():
@ -44,6 +70,7 @@ class Pferd:
async def run(self) -> None:
try:
self._load_authenticators()
self._load_crawlers()
except PferdLoadException:
print("[bold red]Could not initialize PFERD properly")