diff --git a/PFERD/crawl/crawler.py b/PFERD/crawl/crawler.py index 0e67c02..169968e 100644 --- a/PFERD/crawl/crawler.py +++ b/PFERD/crawl/crawler.py @@ -9,7 +9,6 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Ty from ..auth import Authenticator from ..config import Config, Section from ..deduplicator import Deduplicator -from ..limiter import Limiter from ..logging import ProgressBar, log from ..output_dir import FileSink, FileSinkToken, OnConflict, OutputDirectory, OutputDirError, Redownload from ..report import MarkConflictError, MarkDuplicateError, Report @@ -98,10 +97,9 @@ def anoncritical(f: AWrapped) -> AWrapped: class CrawlToken(ReusableAsyncContextManager[ProgressBar]): - def __init__(self, limiter: Limiter, path: PurePath): + def __init__(self, path: PurePath): super().__init__() - self._limiter = limiter self._path = path @property @@ -110,17 +108,15 @@ class CrawlToken(ReusableAsyncContextManager[ProgressBar]): async def _on_aenter(self) -> ProgressBar: self._stack.callback(lambda: log.status("[bold cyan]", "Crawled", fmt_path(self._path))) - await self._stack.enter_async_context(self._limiter.limit_crawl()) bar = self._stack.enter_context(log.crawl_bar("[bold bright_cyan]", "Crawling", fmt_path(self._path))) return bar class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]): - def __init__(self, limiter: Limiter, fs_token: FileSinkToken, path: PurePath): + def __init__(self, fs_token: FileSinkToken, path: PurePath): super().__init__() - self._limiter = limiter self._fs_token = fs_token self._path = path @@ -129,7 +125,6 @@ class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]): return self._path async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]: - await self._stack.enter_async_context(self._limiter.limit_download()) sink = await self._stack.enter_async_context(self._fs_token) # The "Downloaded ..." message is printed in the output dir, not here bar = self._stack.enter_context(log.download_bar("[bold bright_cyan]", "Downloading", @@ -235,12 +230,6 @@ class Crawler(ABC): self.name = name self.error_free = True - self._limiter = Limiter( - task_limit=section.tasks(), - download_limit=section.downloads(), - task_delay=section.task_delay(), - ) - self._deduplicator = Deduplicator(section.windows_paths()) self._transformer = Transformer(section.transform()) @@ -288,7 +277,7 @@ class Crawler(ABC): return None log.explain("Answer: Yes") - return CrawlToken(self._limiter, path) + return CrawlToken(path) async def download( self, @@ -313,7 +302,7 @@ class Crawler(ABC): return None log.explain("Answer: Yes") - return DownloadToken(self._limiter, fs_token, path) + return DownloadToken(fs_token, path) async def _cleanup(self) -> None: log.explain_topic("Decision: Clean up files") diff --git a/PFERD/limiter.py b/PFERD/limiter.py deleted file mode 100644 index 3122a7a..0000000 --- a/PFERD/limiter.py +++ /dev/null @@ -1,97 +0,0 @@ -import asyncio -import time -from contextlib import asynccontextmanager -from dataclasses import dataclass -from typing import AsyncIterator, Optional - - -@dataclass -class Slot: - active: bool = False - last_left: Optional[float] = None - - -class Limiter: - def __init__( - self, - task_limit: int, - download_limit: int, - task_delay: float - ): - if task_limit <= 0: - raise ValueError("task limit must be at least 1") - if download_limit <= 0: - raise ValueError("download limit must be at least 1") - if download_limit > task_limit: - raise ValueError("download limit can't be greater than task limit") - if task_delay < 0: - raise ValueError("Task delay must not be negative") - - self._slots = [Slot() for _ in range(task_limit)] - self._downloads = download_limit - self._delay = task_delay - - self._condition = asyncio.Condition() - - def _acquire_slot(self) -> Optional[Slot]: - for slot in self._slots: - if not slot.active: - slot.active = True - return slot - - return None - - async def _wait_for_slot_delay(self, slot: Slot) -> None: - if slot.last_left is not None: - delay = slot.last_left + self._delay - time.time() - if delay > 0: - await asyncio.sleep(delay) - - def _release_slot(self, slot: Slot) -> None: - slot.last_left = time.time() - slot.active = False - - @asynccontextmanager - async def limit_crawl(self) -> AsyncIterator[None]: - slot: Slot - async with self._condition: - while True: - if found_slot := self._acquire_slot(): - slot = found_slot - break - await self._condition.wait() - - await self._wait_for_slot_delay(slot) - - try: - yield - finally: - async with self._condition: - self._release_slot(slot) - self._condition.notify_all() - - @asynccontextmanager - async def limit_download(self) -> AsyncIterator[None]: - slot: Slot - async with self._condition: - while True: - if self._downloads <= 0: - await self._condition.wait() - continue - - if found_slot := self._acquire_slot(): - slot = found_slot - self._downloads -= 1 - break - - await self._condition.wait() - - await self._wait_for_slot_delay(slot) - - try: - yield - finally: - async with self._condition: - self._release_slot(slot) - self._downloads += 1 - self._condition.notify_all()