From acd674f0a076fba8bfaf64b90bfc3000d3f5cb73 Mon Sep 17 00:00:00 2001 From: Joscha Date: Sat, 15 May 2021 13:21:38 +0200 Subject: [PATCH] Change limiter logic Now download tasks are a subset of all tasks. --- CONFIG.md | 19 +++++----- PFERD/crawler.py | 27 ++++++++------ PFERD/limiter.py | 93 +++++++++++++++++++++++++++++++----------------- 3 files changed, 85 insertions(+), 54 deletions(-) diff --git a/CONFIG.md b/CONFIG.md index a74eef3..2338d8f 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -64,17 +64,14 @@ crawlers: remote file is different. - `transform`: Rules for renaming and excluding certain files and directories. For more details, see [this section](#transformation-rules). (Default: empty) -- `max_concurrent_crawls`: The maximum number of concurrent crawl actions. What - constitutes a crawl action might vary from crawler to crawler, but it usually - means an HTTP request of a page to analyze. (Default: 1) -- `max_concurrent_downloads`: The maximum number of concurrent download actions. - What constitutes a download action might vary from crawler to crawler, but it - usually means an HTTP request for a single file. (Default: 1) -- `request_delay`: Time (in seconds) that the crawler should wait between - subsequent requests. Can be used to avoid unnecessary strain for the crawl - target. Crawl and download actions are handled separately, meaning that a - download action might immediately follow a crawl action even if this is set to - a nonzero value. (Default: 0) +- `max_concurrent_tasks`: The maximum number of concurrent tasks (such as + crawling or downloading). (Default: 1) +- `max_concurrent_downloads`: How many of those tasks can be download tasks at + the same time. Must not be greater than `max_concurrent_tasks`. When not set, + this is the same as `max_concurrent_tasks`. (Optional) +- `delay_between_tasks`: Time (in seconds) that the crawler should wait between + subsequent tasks. Can be used as a sort of rate limit to avoid unnecessary + load for the crawl target. (Default: 0.0) Some crawlers may also require credentials for authentication. To configure how the crawler obtains its credentials, the `auth` option is used. It is set to the diff --git a/PFERD/crawler.py b/PFERD/crawler.py index 48dfcb4..9ec5991 100644 --- a/PFERD/crawler.py +++ b/PFERD/crawler.py @@ -149,26 +149,31 @@ class CrawlerSection(Section): def transform(self) -> str: return self.s.get("transform", "") - def max_concurrent_crawls(self) -> int: - value = self.s.getint("max_concurrent_crawls", fallback=1) + def max_concurrent_tasks(self) -> int: + value = self.s.getint("max_concurrent_tasks", fallback=1) if value <= 0: - self.invalid_value("max_concurrent_crawls", value, + self.invalid_value("max_concurrent_tasks", value, "Must be greater than 0") return value def max_concurrent_downloads(self) -> int: - value = self.s.getint("max_concurrent_downloads", fallback=1) - + tasks = self.max_concurrent_tasks() + value = self.s.getint("max_concurrent_downloads", fallback=None) + if value is None: + return tasks if value <= 0: self.invalid_value("max_concurrent_downloads", value, "Must be greater than 0") + if value > tasks: + self.invalid_value("max_concurrent_downloads", value, + "Must not be greater than max_concurrent_tasks") return value - def request_delay(self) -> float: - value = self.s.getfloat("request_delay", fallback=0.0) + def delay_between_tasks(self) -> float: + value = self.s.getfloat("delay_between_tasks", fallback=0.0) if value < 0: - self.invalid_value("request_delay", value, - "Must be greater than or equal to 0") + self.invalid_value("delay_between_tasks", value, + "Must not be negative") return value def auth(self, authenticators: Dict[str, Authenticator]) -> Authenticator: @@ -203,9 +208,9 @@ class Crawler(ABC): self.error_free = True self._limiter = Limiter( - crawl_limit=section.max_concurrent_crawls(), + task_limit=section.max_concurrent_tasks(), download_limit=section.max_concurrent_downloads(), - delay=section.request_delay(), + task_delay=section.delay_between_tasks(), ) try: diff --git a/PFERD/limiter.py b/PFERD/limiter.py index 6359221..3122a7a 100644 --- a/PFERD/limiter.py +++ b/PFERD/limiter.py @@ -2,7 +2,7 @@ import asyncio import time from contextlib import asynccontextmanager from dataclasses import dataclass -from typing import AsyncContextManager, AsyncIterator, Optional +from typing import AsyncIterator, Optional @dataclass @@ -11,15 +11,27 @@ class Slot: last_left: Optional[float] = None -class SlotPool: - def __init__(self, limit: int, delay: float): - if limit <= 0: - raise ValueError("limit must be greater than 0") +class Limiter: + def __init__( + self, + task_limit: int, + download_limit: int, + task_delay: float + ): + if task_limit <= 0: + raise ValueError("task limit must be at least 1") + if download_limit <= 0: + raise ValueError("download limit must be at least 1") + if download_limit > task_limit: + raise ValueError("download limit can't be greater than task limit") + if task_delay < 0: + raise ValueError("Task delay must not be negative") - self._slots = [Slot() for _ in range(limit)] - self._delay = delay + self._slots = [Slot() for _ in range(task_limit)] + self._downloads = download_limit + self._delay = task_delay - self._free = asyncio.Condition() + self._condition = asyncio.Condition() def _acquire_slot(self) -> Optional[Slot]: for slot in self._slots: @@ -29,40 +41,57 @@ class SlotPool: return None - def _release_slot(self, slot: Slot) -> None: - slot.last_left = time.time() - slot.active = False - - @asynccontextmanager - async def limit(self) -> AsyncIterator[None]: - slot: Slot - async with self._free: - while True: - if found_slot := self._acquire_slot(): - slot = found_slot - break - await self._free.wait() - + async def _wait_for_slot_delay(self, slot: Slot) -> None: if slot.last_left is not None: delay = slot.last_left + self._delay - time.time() if delay > 0: await asyncio.sleep(delay) + def _release_slot(self, slot: Slot) -> None: + slot.last_left = time.time() + slot.active = False + + @asynccontextmanager + async def limit_crawl(self) -> AsyncIterator[None]: + slot: Slot + async with self._condition: + while True: + if found_slot := self._acquire_slot(): + slot = found_slot + break + await self._condition.wait() + + await self._wait_for_slot_delay(slot) + try: yield finally: - async with self._free: + async with self._condition: self._release_slot(slot) - self._free.notify() + self._condition.notify_all() + @asynccontextmanager + async def limit_download(self) -> AsyncIterator[None]: + slot: Slot + async with self._condition: + while True: + if self._downloads <= 0: + await self._condition.wait() + continue -class Limiter: - def __init__(self, crawl_limit: int, download_limit: int, delay: float): - self._crawl_pool = SlotPool(crawl_limit, delay) - self._download_pool = SlotPool(download_limit, delay) + if found_slot := self._acquire_slot(): + slot = found_slot + self._downloads -= 1 + break - def limit_crawl(self) -> AsyncContextManager[None]: - return self._crawl_pool.limit() + await self._condition.wait() - def limit_download(self) -> AsyncContextManager[None]: - return self._crawl_pool.limit() + await self._wait_for_slot_delay(slot) + + try: + yield + finally: + async with self._condition: + self._release_slot(slot) + self._downloads += 1 + self._condition.notify_all()