mirror of
https://github.com/Garmelon/PFERD.git
synced 2023-12-21 10:23:01 +01:00
173 lines
6.5 KiB
Python
173 lines
6.5 KiB
Python
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
from rich.markup import escape
|
|
|
|
from .auth import AUTHENTICATORS, Authenticator, AuthError, AuthSection
|
|
from .config import Config, ConfigOptionError
|
|
from .crawl import CRAWLERS, Crawler, CrawlError, CrawlerSection, KitIliasWebCrawler
|
|
from .logging import log
|
|
from .utils import fmt_path
|
|
|
|
|
|
class PferdLoadError(Exception):
|
|
pass
|
|
|
|
|
|
class Pferd:
|
|
def __init__(self, config: Config, cli_crawlers: Optional[List[str]]):
|
|
"""
|
|
May throw PferdLoadError.
|
|
"""
|
|
|
|
self._config = config
|
|
self._crawlers_to_run = self._find_crawlers_to_run(config, cli_crawlers)
|
|
|
|
self._authenticators: Dict[str, Authenticator] = {}
|
|
self._crawlers: Dict[str, Crawler] = {}
|
|
|
|
def _find_config_crawlers(self, config: Config) -> List[str]:
|
|
crawl_sections = []
|
|
|
|
for name, section in config.crawl_sections():
|
|
if CrawlerSection(section).skip():
|
|
log.explain(f"Skipping {name!r}")
|
|
else:
|
|
crawl_sections.append(name)
|
|
|
|
return crawl_sections
|
|
|
|
def _find_cli_crawlers(self, config: Config, cli_crawlers: List[str]) -> List[str]:
|
|
if len(cli_crawlers) != len(set(cli_crawlers)):
|
|
raise PferdLoadError("Some crawlers were selected multiple times")
|
|
|
|
crawl_sections = [name for name, _ in config.crawl_sections()]
|
|
|
|
crawlers_to_run = [] # With crawl: prefix
|
|
unknown_names = [] # Without crawl: prefix
|
|
|
|
for name in cli_crawlers:
|
|
section_name = f"crawl:{name}"
|
|
if section_name in crawl_sections:
|
|
log.explain(f"Crawler section named {section_name!r} exists")
|
|
crawlers_to_run.append(section_name)
|
|
else:
|
|
log.explain(f"There's no crawler section named {section_name!r}")
|
|
unknown_names.append(name)
|
|
|
|
if unknown_names:
|
|
if len(unknown_names) == 1:
|
|
[name] = unknown_names
|
|
raise PferdLoadError(f"There is no crawler named {name!r}")
|
|
else:
|
|
names_str = ", ".join(repr(name) for name in unknown_names)
|
|
raise PferdLoadError(f"There are no crawlers named {names_str}")
|
|
|
|
return crawlers_to_run
|
|
|
|
def _find_crawlers_to_run(self, config: Config, cli_crawlers: Optional[List[str]]) -> List[str]:
|
|
log.explain_topic("Deciding which crawlers to run")
|
|
|
|
if cli_crawlers is None:
|
|
log.explain("No crawlers specified on CLI")
|
|
log.explain("Running crawlers specified in config")
|
|
return self._find_config_crawlers(config)
|
|
else:
|
|
log.explain("Crawlers specified on CLI")
|
|
return self._find_cli_crawlers(config, cli_crawlers)
|
|
|
|
def _load_authenticators(self) -> None:
|
|
for name, section in self._config.auth_sections():
|
|
log.print(f"[bold bright_cyan]Loading[/] {escape(name)}")
|
|
|
|
auth_type = AuthSection(section).type()
|
|
authenticator_constructor = AUTHENTICATORS.get(auth_type)
|
|
if authenticator_constructor is None:
|
|
raise ConfigOptionError(name, "type", f"Unknown authenticator type: {auth_type!r}")
|
|
|
|
authenticator = authenticator_constructor(name, section, self._config)
|
|
self._authenticators[name] = authenticator
|
|
|
|
def _load_crawlers(self) -> None:
|
|
# Cookie sharing
|
|
kit_ilias_web_paths: Dict[Authenticator, List[Path]] = {}
|
|
|
|
for name, section in self._config.crawl_sections():
|
|
log.print(f"[bold bright_cyan]Loading[/] {escape(name)}")
|
|
|
|
crawl_type = CrawlerSection(section).type()
|
|
crawler_constructor = CRAWLERS.get(crawl_type)
|
|
if crawler_constructor is None:
|
|
raise ConfigOptionError(name, "type", f"Unknown crawler type: {crawl_type!r}")
|
|
|
|
crawler = crawler_constructor(name, section, self._config, self._authenticators)
|
|
self._crawlers[name] = crawler
|
|
|
|
if self._config.default_section.share_cookies():
|
|
if isinstance(crawler, KitIliasWebCrawler):
|
|
crawler.share_cookies(kit_ilias_web_paths)
|
|
|
|
def debug_transforms(self) -> None:
|
|
for name in self._crawlers_to_run:
|
|
crawler = self._crawlers[name]
|
|
log.print("")
|
|
log.print(f"[bold bright_cyan]Debugging transforms[/] for {escape(name)}")
|
|
crawler.debug_transforms()
|
|
|
|
async def run(self, debug_transforms: bool) -> None:
|
|
"""
|
|
May throw ConfigOptionError.
|
|
"""
|
|
|
|
# These two functions must run inside the same event loop as the
|
|
# crawlers, so that any new objects (like Conditions or Futures) can
|
|
# obtain the correct event loop.
|
|
self._load_authenticators()
|
|
self._load_crawlers()
|
|
|
|
if debug_transforms:
|
|
log.output_explain = True
|
|
log.output_report = False
|
|
self.debug_transforms()
|
|
return
|
|
|
|
log.print("")
|
|
|
|
for name in self._crawlers_to_run:
|
|
crawler = self._crawlers[name]
|
|
|
|
log.print(f"[bold bright_cyan]Running[/] {escape(name)}")
|
|
|
|
try:
|
|
await crawler.run()
|
|
except (CrawlError, AuthError) as e:
|
|
log.error(str(e))
|
|
except Exception:
|
|
log.unexpected_exception()
|
|
|
|
def print_report(self) -> None:
|
|
for name in self._crawlers_to_run:
|
|
crawler = self._crawlers.get(name)
|
|
if crawler is None:
|
|
continue # Crawler failed to load
|
|
|
|
log.report("")
|
|
log.report(f"[bold bright_cyan]Report[/] for {escape(name)}")
|
|
|
|
something_changed = False
|
|
for path in sorted(crawler.report.added_files):
|
|
something_changed = True
|
|
log.report(f" [bold bright_green]Added[/] {fmt_path(path)}")
|
|
for path in sorted(crawler.report.changed_files):
|
|
something_changed = True
|
|
log.report(f" [bold bright_yellow]Changed[/] {fmt_path(path)}")
|
|
for path in sorted(crawler.report.deleted_files):
|
|
something_changed = True
|
|
log.report(f" [bold bright_magenta]Deleted[/] {fmt_path(path)}")
|
|
for path in sorted(crawler.report.not_deleted_files):
|
|
something_changed = True
|
|
log.report(f" [bold bright_magenta]Not deleted[/] {fmt_path(path)}")
|
|
|
|
if not something_changed:
|
|
log.report(" Nothing changed")
|