pferd/PFERD/crawler.py

291 lines
9.3 KiB
Python
Raw Normal View History

2021-04-29 13:44:29 +02:00
from abc import ABC, abstractmethod
2021-05-06 01:02:40 +02:00
from datetime import datetime
2021-05-05 23:45:10 +02:00
from pathlib import Path, PurePath
from typing import Any, Awaitable, Callable, Dict, Optional, Tuple, TypeVar
2021-04-29 13:44:29 +02:00
from rich.markup import escape
2021-05-13 18:57:20 +02:00
from .authenticator import Authenticator
2021-05-05 23:45:10 +02:00
from .config import Config, Section
2021-04-29 13:44:29 +02:00
from .limiter import Limiter
from .logging import ProgressBar, log
from .output_dir import FileSink, FileSinkToken, OnConflict, OutputDirectory, OutputDirError, Redownload
from .report import MarkConflictError, MarkDuplicateError
from .transformer import Transformer
from .utils import ReusableAsyncContextManager, fmt_path
2021-04-29 13:44:29 +02:00
class CrawlWarning(Exception):
pass
class CrawlError(Exception):
2021-04-29 13:44:29 +02:00
pass
2021-05-09 01:45:01 +02:00
Wrapped = TypeVar("Wrapped", bound=Callable[..., None])
2021-05-09 01:45:01 +02:00
def noncritical(f: Wrapped) -> Wrapped:
"""
Catches and logs a few noncritical exceptions occurring during the function
call, mainly CrawlWarning.
If any exception occurs during the function call, the crawler's error_free
variable is set to False. This includes noncritical exceptions.
Warning: Must only be applied to member functions of the Crawler class!
2021-05-09 01:45:01 +02:00
"""
def wrapper(*args: Any, **kwargs: Any) -> None:
if not (args and isinstance(args[0], Crawler)):
raise RuntimeError("@noncritical must only applied to Crawler methods")
crawler = args[0]
try:
f(*args, **kwargs)
except (CrawlWarning, OutputDirError, MarkDuplicateError, MarkConflictError) as e:
log.warn(str(e))
crawler.error_free = False
except: # noqa: E722 do not use bare 'except'
crawler.error_free = False
raise
2021-05-09 01:45:01 +02:00
return wrapper # type: ignore
2021-05-09 01:45:01 +02:00
AWrapped = TypeVar("AWrapped", bound=Callable[..., Awaitable[None]])
2021-05-09 01:45:01 +02:00
def anoncritical(f: AWrapped) -> AWrapped:
"""
An async version of @noncritical.
Catches and logs a few noncritical exceptions occurring during the function
call, mainly CrawlWarning.
If any exception occurs during the function call, the crawler's error_free
variable is set to False. This includes noncritical exceptions.
Warning: Must only be applied to member functions of the Crawler class!
"""
async def wrapper(*args: Any, **kwargs: Any) -> None:
if not (args and isinstance(args[0], Crawler)):
raise RuntimeError("@anoncritical must only applied to Crawler methods")
crawler = args[0]
2021-05-09 01:45:01 +02:00
try:
await f(*args, **kwargs)
except (CrawlWarning, OutputDirError, MarkDuplicateError, MarkConflictError) as e:
log.warn(str(e))
crawler.error_free = False
except: # noqa: E722 do not use bare 'except'
crawler.error_free = False
raise
2021-05-09 01:45:01 +02:00
return wrapper # type: ignore
2021-05-05 23:45:10 +02:00
class CrawlToken(ReusableAsyncContextManager[ProgressBar]):
def __init__(self, limiter: Limiter, desc: str):
super().__init__()
self._limiter = limiter
self._desc = desc
async def _on_aenter(self) -> ProgressBar:
await self._stack.enter_async_context(self._limiter.limit_crawl())
bar = self._stack.enter_context(log.crawl_bar(self._desc))
return bar
class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
def __init__(self, limiter: Limiter, fs_token: FileSinkToken, desc: str):
super().__init__()
self._limiter = limiter
self._fs_token = fs_token
self._desc = desc
async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]:
await self._stack.enter_async_context(self._limiter.limit_crawl())
sink = await self._stack.enter_async_context(self._fs_token)
bar = self._stack.enter_context(log.crawl_bar(self._desc))
return bar, sink
2021-05-05 23:45:10 +02:00
class CrawlerSection(Section):
def output_dir(self, name: str) -> Path:
# TODO Use removeprefix() after switching to 3.9
if name.startswith("crawl:"):
name = name[len("crawl:"):]
2021-05-06 01:02:40 +02:00
return Path(self.s.get("output_dir", name)).expanduser()
2021-05-05 23:45:10 +02:00
def redownload(self) -> Redownload:
value = self.s.get("redownload", "never-smart")
2021-05-15 21:33:51 +02:00
try:
return Redownload.from_string(value)
except ValueError as e:
self.invalid_value(
"redownload",
value,
str(e).capitalize(),
)
2021-05-05 23:45:10 +02:00
def on_conflict(self) -> OnConflict:
value = self.s.get("on_conflict", "prompt")
2021-05-15 21:33:51 +02:00
try:
return OnConflict.from_string(value)
except ValueError as e:
self.invalid_value(
"on_conflict",
value,
str(e).capitalize(),
)
2021-05-05 23:45:10 +02:00
def transform(self) -> str:
return self.s.get("transform", "")
def max_concurrent_tasks(self) -> int:
value = self.s.getint("max_concurrent_tasks", fallback=1)
if value <= 0:
self.invalid_value("max_concurrent_tasks", value,
"Must be greater than 0")
return value
def max_concurrent_downloads(self) -> int:
tasks = self.max_concurrent_tasks()
value = self.s.getint("max_concurrent_downloads", fallback=None)
if value is None:
return tasks
if value <= 0:
self.invalid_value("max_concurrent_downloads", value,
"Must be greater than 0")
if value > tasks:
self.invalid_value("max_concurrent_downloads", value,
"Must not be greater than max_concurrent_tasks")
return value
def delay_between_tasks(self) -> float:
value = self.s.getfloat("delay_between_tasks", fallback=0.0)
if value < 0:
self.invalid_value("delay_between_tasks", value,
"Must not be negative")
return value
2021-05-13 18:57:20 +02:00
def auth(self, authenticators: Dict[str, Authenticator]) -> Authenticator:
value = self.s.get("auth")
if value is None:
self.missing_value("auth")
auth = authenticators.get(value)
2021-05-13 18:57:20 +02:00
if auth is None:
2021-05-15 00:39:55 +02:00
self.invalid_value("auth", value, "No such auth section exists")
2021-05-13 18:57:20 +02:00
return auth
2021-05-05 23:45:10 +02:00
2021-04-29 13:44:29 +02:00
class Crawler(ABC):
2021-04-30 16:22:14 +02:00
def __init__(
self,
name: str,
2021-05-05 23:45:10 +02:00
section: CrawlerSection,
config: Config,
2021-04-30 16:22:14 +02:00
) -> None:
2021-04-29 13:44:29 +02:00
"""
2021-04-29 15:43:20 +02:00
Initialize a crawler from its name and its section in the config file.
If you are writing your own constructor for your own crawler, make sure
to call this constructor first (via super().__init__).
2021-04-29 13:44:29 +02:00
May throw a CrawlerLoadException.
"""
self.name = name
self.error_free = True
2021-04-29 13:44:29 +02:00
self._limiter = Limiter(
task_limit=section.max_concurrent_tasks(),
download_limit=section.max_concurrent_downloads(),
task_delay=section.delay_between_tasks(),
)
self._transformer = Transformer(section.transform())
2021-04-29 13:44:29 +02:00
2021-05-05 18:08:34 +02:00
self._output_dir = OutputDirectory(
config.default_section.working_dir() / section.output_dir(name),
2021-05-05 23:45:10 +02:00
section.redownload(),
section.on_conflict(),
)
2021-04-29 13:44:29 +02:00
async def crawl(self, path: PurePath) -> Optional[CrawlToken]:
log.explain_topic(f"Decision: Crawl {fmt_path(path)}")
if self._transformer.transform(path) is None:
log.explain("Answer: No")
return None
2021-05-06 01:02:40 +02:00
log.explain("Answer: Yes")
desc = f"[bold bright_cyan]Crawling[/] {escape(fmt_path(path))}"
return CrawlToken(self._limiter, desc)
2021-05-15 14:03:15 +02:00
2021-05-06 01:02:40 +02:00
async def download(
self,
path: PurePath,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
) -> Optional[DownloadToken]:
log.explain_topic(f"Decision: Download {fmt_path(path)}")
2021-05-15 14:03:15 +02:00
transformed_path = self._transformer.transform(path)
if transformed_path is None:
log.explain("Answer: No")
2021-05-15 14:03:15 +02:00
return None
fs_token = await self._output_dir.download(transformed_path, mtime, redownload, on_conflict)
if fs_token is None:
log.explain("Answer: No")
return None
log.explain("Answer: Yes")
desc = f"[bold bright_cyan]Downloading[/] {escape(fmt_path(path))}"
return DownloadToken(self._limiter, fs_token, desc)
2021-05-06 01:02:40 +02:00
async def _cleanup(self) -> None:
log.explain_topic("Decision: Clean up files")
if self.error_free:
log.explain("No warnings or errors occurred during this run")
log.explain("Answer: Yes")
await self._output_dir.cleanup()
else:
log.explain("Warnings or errors occurred during this run")
log.explain("Answer: No")
2021-04-29 13:44:29 +02:00
async def run(self) -> None:
2021-04-29 15:43:20 +02:00
"""
Start the crawling process. Call this function if you want to use a
crawler.
"""
with log.show_progress():
self._output_dir.prepare()
await self._run()
await self._cleanup()
2021-04-29 13:44:29 +02:00
@abstractmethod
async def _run(self) -> None:
2021-04-29 15:43:20 +02:00
"""
Overwrite this function if you are writing a crawler.
This function must not return before all crawling is complete. To crawl
multiple things concurrently, asyncio.gather can be used.
"""
2021-04-29 13:44:29 +02:00
pass