pferd/PFERD/crawler.py

350 lines
11 KiB
Python
Raw Normal View History

import asyncio
2021-04-29 13:44:29 +02:00
from abc import ABC, abstractmethod
from contextlib import asynccontextmanager
2021-05-06 01:02:40 +02:00
from datetime import datetime
2021-05-05 23:45:10 +02:00
from pathlib import Path, PurePath
# TODO In Python 3.9 and above, AsyncContextManager is deprecated
2021-05-16 14:32:53 +02:00
from typing import Any, AsyncContextManager, AsyncIterator, Awaitable, Callable, Dict, Optional, TypeVar
2021-04-29 13:44:29 +02:00
2021-05-13 22:28:14 +02:00
import aiohttp
2021-04-29 13:44:29 +02:00
from rich.markup import escape
2021-05-13 18:57:20 +02:00
from .authenticator import Authenticator
2021-05-05 23:45:10 +02:00
from .config import Config, Section
2021-04-29 13:44:29 +02:00
from .limiter import Limiter
from .logging import ProgressBar, log
2021-05-06 01:02:40 +02:00
from .output_dir import FileSink, OnConflict, OutputDirectory, Redownload
from .transformer import Transformer
2021-05-19 17:32:23 +02:00
from .version import NAME, VERSION
2021-04-29 13:44:29 +02:00
class CrawlWarning(Exception):
pass
class CrawlError(Exception):
2021-04-29 13:44:29 +02:00
pass
2021-05-09 01:45:01 +02:00
Wrapped = TypeVar("Wrapped", bound=Callable[..., None])
2021-05-09 01:45:01 +02:00
def noncritical(f: Wrapped) -> Wrapped:
"""
Catches all exceptions occuring during the function call. If an exception
occurs, the crawler's error_free variable is set to False.
Warning: Must only be applied to member functions of the Crawler class!
2021-05-09 01:45:01 +02:00
"""
def wrapper(*args: Any, **kwargs: Any) -> None:
if not (args and isinstance(args[0], Crawler)):
raise RuntimeError("@noncritical must only applied to Crawler methods")
crawler = args[0]
try:
f(*args, **kwargs)
except CrawlWarning as e:
log.print(f"[bold bright_red]Warning[/] {escape(str(e))}")
crawler.error_free = False
except CrawlError as e:
2021-05-22 14:45:32 +02:00
# TODO Don't print error, just pass it on upwards
log.print(f"[bold bright_red]Error[/] [red]{escape(str(e))}")
crawler.error_free = False
raise
2021-05-09 01:45:01 +02:00
return wrapper # type: ignore
2021-05-09 01:45:01 +02:00
AWrapped = TypeVar("AWrapped", bound=Callable[..., Awaitable[None]])
2021-05-09 01:45:01 +02:00
def anoncritical(f: AWrapped) -> AWrapped:
"""
An async version of @noncritical.
2021-05-09 01:45:01 +02:00
Catches all exceptions occuring during the function call. If an exception
occurs, the crawler's error_free variable is set to False.
Warning: Must only be applied to member functions of the Crawler class!
"""
async def wrapper(*args: Any, **kwargs: Any) -> None:
if not (args and isinstance(args[0], Crawler)):
raise RuntimeError("@anoncritical must only applied to Crawler methods")
crawler = args[0]
2021-05-09 01:45:01 +02:00
try:
await f(*args, **kwargs)
except CrawlWarning as e:
log.print(f"[bold bright_red]Warning[/] {escape(str(e))}")
crawler.error_free = False
except CrawlError as e:
log.print(f"[bold bright_red]Error[/] [red]{escape(str(e))}")
crawler.error_free = False
raise
2021-05-09 01:45:01 +02:00
return wrapper # type: ignore
2021-05-05 23:45:10 +02:00
class CrawlerSection(Section):
def output_dir(self, name: str) -> Path:
# TODO Use removeprefix() after switching to 3.9
if name.startswith("crawl:"):
name = name[len("crawl:"):]
2021-05-06 01:02:40 +02:00
return Path(self.s.get("output_dir", name)).expanduser()
2021-05-05 23:45:10 +02:00
def redownload(self) -> Redownload:
value = self.s.get("redownload", "never-smart")
2021-05-15 21:33:51 +02:00
try:
return Redownload.from_string(value)
except ValueError as e:
self.invalid_value(
"redownload",
value,
str(e).capitalize(),
)
2021-05-05 23:45:10 +02:00
def on_conflict(self) -> OnConflict:
value = self.s.get("on_conflict", "prompt")
2021-05-15 21:33:51 +02:00
try:
return OnConflict.from_string(value)
except ValueError as e:
self.invalid_value(
"on_conflict",
value,
str(e).capitalize(),
)
2021-05-05 23:45:10 +02:00
def transform(self) -> str:
return self.s.get("transform", "")
def max_concurrent_tasks(self) -> int:
value = self.s.getint("max_concurrent_tasks", fallback=1)
if value <= 0:
self.invalid_value("max_concurrent_tasks", value,
"Must be greater than 0")
return value
def max_concurrent_downloads(self) -> int:
tasks = self.max_concurrent_tasks()
value = self.s.getint("max_concurrent_downloads", fallback=None)
if value is None:
return tasks
if value <= 0:
self.invalid_value("max_concurrent_downloads", value,
"Must be greater than 0")
if value > tasks:
self.invalid_value("max_concurrent_downloads", value,
"Must not be greater than max_concurrent_tasks")
return value
def delay_between_tasks(self) -> float:
value = self.s.getfloat("delay_between_tasks", fallback=0.0)
if value < 0:
self.invalid_value("delay_between_tasks", value,
"Must not be negative")
return value
2021-05-13 18:57:20 +02:00
def auth(self, authenticators: Dict[str, Authenticator]) -> Authenticator:
value = self.s.get("auth")
if value is None:
self.missing_value("auth")
auth = authenticators.get(value)
2021-05-13 18:57:20 +02:00
if auth is None:
2021-05-15 00:39:55 +02:00
self.invalid_value("auth", value, "No such auth section exists")
2021-05-13 18:57:20 +02:00
return auth
2021-05-05 23:45:10 +02:00
2021-04-29 13:44:29 +02:00
class Crawler(ABC):
2021-04-30 16:22:14 +02:00
def __init__(
self,
name: str,
2021-05-05 23:45:10 +02:00
section: CrawlerSection,
config: Config,
2021-04-30 16:22:14 +02:00
) -> None:
2021-04-29 13:44:29 +02:00
"""
2021-04-29 15:43:20 +02:00
Initialize a crawler from its name and its section in the config file.
If you are writing your own constructor for your own crawler, make sure
to call this constructor first (via super().__init__).
2021-04-29 13:44:29 +02:00
May throw a CrawlerLoadException.
"""
self.name = name
self.error_free = True
2021-04-29 13:44:29 +02:00
self._limiter = Limiter(
task_limit=section.max_concurrent_tasks(),
download_limit=section.max_concurrent_downloads(),
task_delay=section.delay_between_tasks(),
)
self._transformer = Transformer(section.transform())
2021-04-29 13:44:29 +02:00
2021-05-05 18:08:34 +02:00
self._output_dir = OutputDirectory(
config.default_section.working_dir() / section.output_dir(name),
2021-05-05 23:45:10 +02:00
section.redownload(),
section.on_conflict(),
)
2021-04-29 13:44:29 +02:00
@asynccontextmanager
async def crawl_bar(
2021-04-29 13:44:29 +02:00
self,
path: PurePath,
2021-04-29 13:44:29 +02:00
total: Optional[int] = None,
) -> AsyncIterator[ProgressBar]:
desc = f"[bold bright_cyan]Crawling[/] {escape(str(path))}"
async with self._limiter.limit_crawl():
with log.crawl_bar(desc, total=total) as bar:
2021-04-29 13:44:29 +02:00
yield bar
@asynccontextmanager
async def download_bar(
self,
path: PurePath,
2021-05-06 01:02:40 +02:00
total: Optional[int] = None,
) -> AsyncIterator[ProgressBar]:
desc = f"[bold bright_cyan]Downloading[/] {escape(str(path))}"
async with self._limiter.limit_download():
with log.download_bar(desc, total=total) as bar:
yield bar
2021-05-06 01:02:40 +02:00
2021-05-15 14:03:15 +02:00
def should_crawl(self, path: PurePath) -> bool:
return self._transformer.transform(path) is not None
2021-05-06 01:02:40 +02:00
async def download(
self,
path: PurePath,
mtime: Optional[datetime] = None,
redownload: Optional[Redownload] = None,
on_conflict: Optional[OnConflict] = None,
) -> Optional[AsyncContextManager[FileSink]]:
2021-05-15 14:03:15 +02:00
transformed_path = self._transformer.transform(path)
if transformed_path is None:
return None
2021-05-06 01:02:40 +02:00
return await self._output_dir.download(
2021-05-15 14:03:15 +02:00
transformed_path, mtime, redownload, on_conflict)
2021-05-06 01:02:40 +02:00
async def cleanup(self) -> None:
await self._output_dir.cleanup()
2021-04-29 13:44:29 +02:00
async def run(self) -> None:
2021-04-29 15:43:20 +02:00
"""
Start the crawling process. Call this function if you want to use a
crawler.
"""
with log.show_progress():
2021-04-29 13:44:29 +02:00
await self.crawl()
@abstractmethod
async def crawl(self) -> None:
2021-04-29 15:43:20 +02:00
"""
Overwrite this function if you are writing a crawler.
This function must not return before all crawling is complete. To crawl
multiple things concurrently, asyncio.gather can be used.
"""
2021-04-29 13:44:29 +02:00
pass
2021-05-13 22:28:14 +02:00
class HttpCrawler(Crawler):
COOKIE_FILE = PurePath(".cookies")
def __init__(
self,
name: str,
section: CrawlerSection,
config: Config,
) -> None:
super().__init__(name, section, config)
2021-05-13 22:28:14 +02:00
self._cookie_jar_path = self._output_dir.resolve(self.COOKIE_FILE)
self._output_dir.register_reserved(self.COOKIE_FILE)
self._authentication_id = 0
self._authentication_lock = asyncio.Lock()
async def prepare_request(self) -> int:
# We acquire the lock here to ensure we wait for any concurrent authenticate to finish.
# This should reduce the amount of requests we make: If an authentication is in progress
# all future requests wait for authentication to complete.
async with self._authentication_lock:
return self._authentication_id
async def authenticate(self, current_id: int) -> None:
async with self._authentication_lock:
# Another thread successfully called authenticate in between
# We do not want to perform auth again, so return here. We can
# assume auth suceeded as authenticate will throw an error if
# it failed.
if current_id != self._authentication_id:
return
await self._authenticate()
self._authentication_id += 1
async def _authenticate(self) -> None:
"""
Performs authentication. This method must only return normally if authentication suceeded.
In all other cases it mus either retry internally or throw a terminal exception.
"""
raise RuntimeError("_authenticate() was called but crawler doesn't provide an implementation")
2021-05-13 22:28:14 +02:00
async def run(self) -> None:
cookie_jar = aiohttp.CookieJar()
try:
cookie_jar.load(self._cookie_jar_path)
except Exception:
pass
2021-05-14 00:09:58 +02:00
async with aiohttp.ClientSession(
2021-05-19 17:32:23 +02:00
headers={"User-Agent": f"{NAME}/{VERSION}"},
2021-05-14 00:09:58 +02:00
cookie_jar=cookie_jar,
) as session:
2021-05-13 22:28:14 +02:00
self.session = session
try:
await super().run()
finally:
del self.session
try:
cookie_jar.save(self._cookie_jar_path)
except Exception:
log.print(f"[bold red]Warning:[/] Failed to save cookies to {escape(str(self.COOKIE_FILE))}")
def repeat(attempts: int) -> Callable[[Wrapped], Wrapped]:
"""Deprecated."""
def decorator(f: Wrapped) -> Wrapped:
def wrapper(self: "Crawler", *args: Any, **kwargs: Any) -> None:
for _ in range(attempts - 1):
try:
f(self, *args, **kwargs)
return
except Exception:
pass
f(self, *args, **kwargs)
return wrapper # type: ignore
return decorator
def arepeat(attempts: int) -> Callable[[AWrapped], AWrapped]:
"""Deprecated."""
def decorator(f: AWrapped) -> AWrapped:
async def wrapper(self: "Crawler", *args: Any, **kwargs: Any) -> None:
for _ in range(attempts - 1):
try:
await f(self, *args, **kwargs)
return
except Exception:
pass
await f(self, *args, **kwargs)
return wrapper # type: ignore
return decorator