From 296a169dd30e68a679624b2a53ef516281a51a0d Mon Sep 17 00:00:00 2001 From: Joscha Date: Sat, 15 May 2021 00:38:46 +0200 Subject: [PATCH] Make limiter logic more complex The limiter can now distinguish between crawl and download actions and has a fancy slot system and delay logic. --- CONFIG.md | 11 ++++++++ PFERD/config.py | 12 +++++++-- PFERD/crawler.py | 65 +++++++++++++++++++++++++++++++++--------------- PFERD/limiter.py | 65 ++++++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 126 insertions(+), 27 deletions(-) diff --git a/CONFIG.md b/CONFIG.md index 2cac906..a74eef3 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -64,6 +64,17 @@ crawlers: remote file is different. - `transform`: Rules for renaming and excluding certain files and directories. For more details, see [this section](#transformation-rules). (Default: empty) +- `max_concurrent_crawls`: The maximum number of concurrent crawl actions. What + constitutes a crawl action might vary from crawler to crawler, but it usually + means an HTTP request of a page to analyze. (Default: 1) +- `max_concurrent_downloads`: The maximum number of concurrent download actions. + What constitutes a download action might vary from crawler to crawler, but it + usually means an HTTP request for a single file. (Default: 1) +- `request_delay`: Time (in seconds) that the crawler should wait between + subsequent requests. Can be used to avoid unnecessary strain for the crawl + target. Crawl and download actions are handled separately, meaning that a + download action might immediately follow a crawl action even if this is set to + a nonzero value. (Default: 0) Some crawlers may also require credentials for authentication. To configure how the crawler obtains its credentials, the `auth` option is used. It is set to the diff --git a/PFERD/config.py b/PFERD/config.py index 56ea9af..0520f74 100644 --- a/PFERD/config.py +++ b/PFERD/config.py @@ -33,8 +33,16 @@ class Section: def error(self, key: str, desc: str) -> NoReturn: raise ConfigFormatException(self.s.name, key, desc) - def invalid_value(self, key: str, value: Any) -> NoReturn: - self.error(key, f"Invalid value: {value!r}") + def invalid_value( + self, + key: str, + value: Any, + reason: Optional[str], + ) -> NoReturn: + if reason is None: + self.error(key, f"Invalid value {value!r}") + else: + self.error(key, f"Invalid value {value!r}: {reason}") def missing_value(self, key: str) -> NoReturn: self.error(key, "Missing value") diff --git a/PFERD/crawler.py b/PFERD/crawler.py index ece62c1..f506294 100644 --- a/PFERD/crawler.py +++ b/PFERD/crawler.py @@ -139,6 +139,28 @@ class CrawlerSection(Section): def transform(self) -> str: return self.s.get("transform", "") + def max_concurrent_crawls(self) -> int: + value = self.s.getint("max_concurrent_crawls", fallback=1) + if value <= 0: + self.invalid_value("max_concurrent_crawls", value, + "Must be greater than 0") + return value + + def max_concurrent_downloads(self) -> int: + value = self.s.getint("max_concurrent_downloads", fallback=1) + + if value <= 0: + self.invalid_value("max_concurrent_downloads", value, + "Must be greater than 0") + return value + + def request_delay(self) -> float: + value = self.s.getfloat("request_delay", fallback=0.0) + if value < 0: + self.invalid_value("request_delay", value, + "Must be greater than or equal to 0") + return value + def auth(self, authenticators: Dict[str, Authenticator]) -> Authenticator: value = self.s.get("auth") if value is None: @@ -168,9 +190,14 @@ class Crawler(ABC): self.name = name self._conductor = conductor - self._limiter = Limiter() self.error_free = True + self._limiter = Limiter( + crawl_limit=section.max_concurrent_crawls(), + download_limit=section.max_concurrent_downloads(), + delay=section.request_delay(), + ) + try: self._transformer = Transformer(section.transform()) except RuleParseException as e: @@ -210,28 +237,26 @@ class Crawler(ABC): return self._conductor.exclusive_output() @asynccontextmanager - async def progress_bar( - self, - desc: str, - total: Optional[int] = None, - ) -> AsyncIterator[ProgressBar]: - async with self._limiter.limit(): - with self._conductor.progress_bar(desc, total=total) as bar: - yield bar - - def crawl_bar(self, path: PurePath) -> AsyncContextManager[ProgressBar]: - pathstr = escape(str(path)) - desc = f"[bold magenta]Crawling[/bold magenta] {pathstr}" - return self.progress_bar(desc) - - def download_bar( + async def crawl_bar( self, path: PurePath, total: Optional[int] = None, - ) -> AsyncContextManager[ProgressBar]: - pathstr = escape(str(path)) - desc = f"[bold green]Downloading[/bold green] {pathstr}" - return self.progress_bar(desc, total=total) + ) -> AsyncIterator[ProgressBar]: + desc = f"[bold bright_cyan]Crawling[/] {escape(str(path))}" + async with self._limiter.limit_crawl(): + with self._conductor.progress_bar(desc, total=total) as bar: + yield bar + + @asynccontextmanager + async def download_bar( + self, + path: PurePath, + total: Optional[int] = None, + ) -> AsyncIterator[ProgressBar]: + desc = f"[bold bright_cyan]Downloading[/] {escape(str(path))}" + async with self._limiter.limit_download(): + with self._conductor.progress_bar(desc, total=total) as bar: + yield bar async def download( self, diff --git a/PFERD/limiter.py b/PFERD/limiter.py index ae72fe6..6359221 100644 --- a/PFERD/limiter.py +++ b/PFERD/limiter.py @@ -1,13 +1,68 @@ import asyncio +import time from contextlib import asynccontextmanager -from typing import AsyncIterator +from dataclasses import dataclass +from typing import AsyncContextManager, AsyncIterator, Optional -class Limiter: - def __init__(self, limit: int = 10): - self._semaphore = asyncio.Semaphore(limit) +@dataclass +class Slot: + active: bool = False + last_left: Optional[float] = None + + +class SlotPool: + def __init__(self, limit: int, delay: float): + if limit <= 0: + raise ValueError("limit must be greater than 0") + + self._slots = [Slot() for _ in range(limit)] + self._delay = delay + + self._free = asyncio.Condition() + + def _acquire_slot(self) -> Optional[Slot]: + for slot in self._slots: + if not slot.active: + slot.active = True + return slot + + return None + + def _release_slot(self, slot: Slot) -> None: + slot.last_left = time.time() + slot.active = False @asynccontextmanager async def limit(self) -> AsyncIterator[None]: - async with self._semaphore: + slot: Slot + async with self._free: + while True: + if found_slot := self._acquire_slot(): + slot = found_slot + break + await self._free.wait() + + if slot.last_left is not None: + delay = slot.last_left + self._delay - time.time() + if delay > 0: + await asyncio.sleep(delay) + + try: yield + finally: + async with self._free: + self._release_slot(slot) + self._free.notify() + + +class Limiter: + def __init__(self, crawl_limit: int, download_limit: int, delay: float): + self._crawl_pool = SlotPool(crawl_limit, delay) + self._download_pool = SlotPool(download_limit, delay) + + def limit_crawl(self) -> AsyncContextManager[None]: + return self._crawl_pool.limit() + + def limit_download(self) -> AsyncContextManager[None]: + return self._crawl_pool.limit()