Change limiter logic

Now download tasks are a subset of all tasks.
This commit is contained in:
Joscha 2021-05-15 13:21:38 +02:00
parent b0f9e1e8b4
commit acd674f0a0
3 changed files with 85 additions and 54 deletions

View File

@ -64,17 +64,14 @@ crawlers:
remote file is different.
- `transform`: Rules for renaming and excluding certain files and directories.
For more details, see [this section](#transformation-rules). (Default: empty)
- `max_concurrent_crawls`: The maximum number of concurrent crawl actions. What
constitutes a crawl action might vary from crawler to crawler, but it usually
means an HTTP request of a page to analyze. (Default: 1)
- `max_concurrent_downloads`: The maximum number of concurrent download actions.
What constitutes a download action might vary from crawler to crawler, but it
usually means an HTTP request for a single file. (Default: 1)
- `request_delay`: Time (in seconds) that the crawler should wait between
subsequent requests. Can be used to avoid unnecessary strain for the crawl
target. Crawl and download actions are handled separately, meaning that a
download action might immediately follow a crawl action even if this is set to
a nonzero value. (Default: 0)
- `max_concurrent_tasks`: The maximum number of concurrent tasks (such as
crawling or downloading). (Default: 1)
- `max_concurrent_downloads`: How many of those tasks can be download tasks at
the same time. Must not be greater than `max_concurrent_tasks`. When not set,
this is the same as `max_concurrent_tasks`. (Optional)
- `delay_between_tasks`: Time (in seconds) that the crawler should wait between
subsequent tasks. Can be used as a sort of rate limit to avoid unnecessary
load for the crawl target. (Default: 0.0)
Some crawlers may also require credentials for authentication. To configure how
the crawler obtains its credentials, the `auth` option is used. It is set to the

View File

@ -149,26 +149,31 @@ class CrawlerSection(Section):
def transform(self) -> str:
return self.s.get("transform", "")
def max_concurrent_crawls(self) -> int:
value = self.s.getint("max_concurrent_crawls", fallback=1)
def max_concurrent_tasks(self) -> int:
value = self.s.getint("max_concurrent_tasks", fallback=1)
if value <= 0:
self.invalid_value("max_concurrent_crawls", value,
self.invalid_value("max_concurrent_tasks", value,
"Must be greater than 0")
return value
def max_concurrent_downloads(self) -> int:
value = self.s.getint("max_concurrent_downloads", fallback=1)
tasks = self.max_concurrent_tasks()
value = self.s.getint("max_concurrent_downloads", fallback=None)
if value is None:
return tasks
if value <= 0:
self.invalid_value("max_concurrent_downloads", value,
"Must be greater than 0")
if value > tasks:
self.invalid_value("max_concurrent_downloads", value,
"Must not be greater than max_concurrent_tasks")
return value
def request_delay(self) -> float:
value = self.s.getfloat("request_delay", fallback=0.0)
def delay_between_tasks(self) -> float:
value = self.s.getfloat("delay_between_tasks", fallback=0.0)
if value < 0:
self.invalid_value("request_delay", value,
"Must be greater than or equal to 0")
self.invalid_value("delay_between_tasks", value,
"Must not be negative")
return value
def auth(self, authenticators: Dict[str, Authenticator]) -> Authenticator:
@ -203,9 +208,9 @@ class Crawler(ABC):
self.error_free = True
self._limiter = Limiter(
crawl_limit=section.max_concurrent_crawls(),
task_limit=section.max_concurrent_tasks(),
download_limit=section.max_concurrent_downloads(),
delay=section.request_delay(),
task_delay=section.delay_between_tasks(),
)
try:

View File

@ -2,7 +2,7 @@ import asyncio
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncContextManager, AsyncIterator, Optional
from typing import AsyncIterator, Optional
@dataclass
@ -11,15 +11,27 @@ class Slot:
last_left: Optional[float] = None
class SlotPool:
def __init__(self, limit: int, delay: float):
if limit <= 0:
raise ValueError("limit must be greater than 0")
class Limiter:
def __init__(
self,
task_limit: int,
download_limit: int,
task_delay: float
):
if task_limit <= 0:
raise ValueError("task limit must be at least 1")
if download_limit <= 0:
raise ValueError("download limit must be at least 1")
if download_limit > task_limit:
raise ValueError("download limit can't be greater than task limit")
if task_delay < 0:
raise ValueError("Task delay must not be negative")
self._slots = [Slot() for _ in range(limit)]
self._delay = delay
self._slots = [Slot() for _ in range(task_limit)]
self._downloads = download_limit
self._delay = task_delay
self._free = asyncio.Condition()
self._condition = asyncio.Condition()
def _acquire_slot(self) -> Optional[Slot]:
for slot in self._slots:
@ -29,40 +41,57 @@ class SlotPool:
return None
def _release_slot(self, slot: Slot) -> None:
slot.last_left = time.time()
slot.active = False
@asynccontextmanager
async def limit(self) -> AsyncIterator[None]:
slot: Slot
async with self._free:
while True:
if found_slot := self._acquire_slot():
slot = found_slot
break
await self._free.wait()
async def _wait_for_slot_delay(self, slot: Slot) -> None:
if slot.last_left is not None:
delay = slot.last_left + self._delay - time.time()
if delay > 0:
await asyncio.sleep(delay)
def _release_slot(self, slot: Slot) -> None:
slot.last_left = time.time()
slot.active = False
@asynccontextmanager
async def limit_crawl(self) -> AsyncIterator[None]:
slot: Slot
async with self._condition:
while True:
if found_slot := self._acquire_slot():
slot = found_slot
break
await self._condition.wait()
await self._wait_for_slot_delay(slot)
try:
yield
finally:
async with self._free:
async with self._condition:
self._release_slot(slot)
self._free.notify()
self._condition.notify_all()
@asynccontextmanager
async def limit_download(self) -> AsyncIterator[None]:
slot: Slot
async with self._condition:
while True:
if self._downloads <= 0:
await self._condition.wait()
continue
class Limiter:
def __init__(self, crawl_limit: int, download_limit: int, delay: float):
self._crawl_pool = SlotPool(crawl_limit, delay)
self._download_pool = SlotPool(download_limit, delay)
if found_slot := self._acquire_slot():
slot = found_slot
self._downloads -= 1
break
def limit_crawl(self) -> AsyncContextManager[None]:
return self._crawl_pool.limit()
await self._condition.wait()
def limit_download(self) -> AsyncContextManager[None]:
return self._crawl_pool.limit()
await self._wait_for_slot_delay(slot)
try:
yield
finally:
async with self._condition:
self._release_slot(slot)
self._downloads += 1
self._condition.notify_all()