mirror of
https://github.com/Garmelon/PFERD.git
synced 2023-12-21 10:23:01 +01:00
Remove limiter
This commit is contained in:
parent
10e1a5e871
commit
1ca10571f0
@ -9,7 +9,6 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Ty
|
|||||||
from ..auth import Authenticator
|
from ..auth import Authenticator
|
||||||
from ..config import Config, Section
|
from ..config import Config, Section
|
||||||
from ..deduplicator import Deduplicator
|
from ..deduplicator import Deduplicator
|
||||||
from ..limiter import Limiter
|
|
||||||
from ..logging import ProgressBar, log
|
from ..logging import ProgressBar, log
|
||||||
from ..output_dir import FileSink, FileSinkToken, OnConflict, OutputDirectory, OutputDirError, Redownload
|
from ..output_dir import FileSink, FileSinkToken, OnConflict, OutputDirectory, OutputDirError, Redownload
|
||||||
from ..report import MarkConflictError, MarkDuplicateError, Report
|
from ..report import MarkConflictError, MarkDuplicateError, Report
|
||||||
@ -98,10 +97,9 @@ def anoncritical(f: AWrapped) -> AWrapped:
|
|||||||
|
|
||||||
|
|
||||||
class CrawlToken(ReusableAsyncContextManager[ProgressBar]):
|
class CrawlToken(ReusableAsyncContextManager[ProgressBar]):
|
||||||
def __init__(self, limiter: Limiter, path: PurePath):
|
def __init__(self, path: PurePath):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
self._limiter = limiter
|
|
||||||
self._path = path
|
self._path = path
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -110,17 +108,15 @@ class CrawlToken(ReusableAsyncContextManager[ProgressBar]):
|
|||||||
|
|
||||||
async def _on_aenter(self) -> ProgressBar:
|
async def _on_aenter(self) -> ProgressBar:
|
||||||
self._stack.callback(lambda: log.status("[bold cyan]", "Crawled", fmt_path(self._path)))
|
self._stack.callback(lambda: log.status("[bold cyan]", "Crawled", fmt_path(self._path)))
|
||||||
await self._stack.enter_async_context(self._limiter.limit_crawl())
|
|
||||||
bar = self._stack.enter_context(log.crawl_bar("[bold bright_cyan]", "Crawling", fmt_path(self._path)))
|
bar = self._stack.enter_context(log.crawl_bar("[bold bright_cyan]", "Crawling", fmt_path(self._path)))
|
||||||
|
|
||||||
return bar
|
return bar
|
||||||
|
|
||||||
|
|
||||||
class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
|
class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
|
||||||
def __init__(self, limiter: Limiter, fs_token: FileSinkToken, path: PurePath):
|
def __init__(self, fs_token: FileSinkToken, path: PurePath):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
self._limiter = limiter
|
|
||||||
self._fs_token = fs_token
|
self._fs_token = fs_token
|
||||||
self._path = path
|
self._path = path
|
||||||
|
|
||||||
@ -129,7 +125,6 @@ class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]):
|
|||||||
return self._path
|
return self._path
|
||||||
|
|
||||||
async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]:
|
async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]:
|
||||||
await self._stack.enter_async_context(self._limiter.limit_download())
|
|
||||||
sink = await self._stack.enter_async_context(self._fs_token)
|
sink = await self._stack.enter_async_context(self._fs_token)
|
||||||
# The "Downloaded ..." message is printed in the output dir, not here
|
# The "Downloaded ..." message is printed in the output dir, not here
|
||||||
bar = self._stack.enter_context(log.download_bar("[bold bright_cyan]", "Downloading",
|
bar = self._stack.enter_context(log.download_bar("[bold bright_cyan]", "Downloading",
|
||||||
@ -235,12 +230,6 @@ class Crawler(ABC):
|
|||||||
self.name = name
|
self.name = name
|
||||||
self.error_free = True
|
self.error_free = True
|
||||||
|
|
||||||
self._limiter = Limiter(
|
|
||||||
task_limit=section.tasks(),
|
|
||||||
download_limit=section.downloads(),
|
|
||||||
task_delay=section.task_delay(),
|
|
||||||
)
|
|
||||||
|
|
||||||
self._deduplicator = Deduplicator(section.windows_paths())
|
self._deduplicator = Deduplicator(section.windows_paths())
|
||||||
self._transformer = Transformer(section.transform())
|
self._transformer = Transformer(section.transform())
|
||||||
|
|
||||||
@ -288,7 +277,7 @@ class Crawler(ABC):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
log.explain("Answer: Yes")
|
log.explain("Answer: Yes")
|
||||||
return CrawlToken(self._limiter, path)
|
return CrawlToken(path)
|
||||||
|
|
||||||
async def download(
|
async def download(
|
||||||
self,
|
self,
|
||||||
@ -313,7 +302,7 @@ class Crawler(ABC):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
log.explain("Answer: Yes")
|
log.explain("Answer: Yes")
|
||||||
return DownloadToken(self._limiter, fs_token, path)
|
return DownloadToken(fs_token, path)
|
||||||
|
|
||||||
async def _cleanup(self) -> None:
|
async def _cleanup(self) -> None:
|
||||||
log.explain_topic("Decision: Clean up files")
|
log.explain_topic("Decision: Clean up files")
|
||||||
|
@ -1,97 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import time
|
|
||||||
from contextlib import asynccontextmanager
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from typing import AsyncIterator, Optional
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Slot:
|
|
||||||
active: bool = False
|
|
||||||
last_left: Optional[float] = None
|
|
||||||
|
|
||||||
|
|
||||||
class Limiter:
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
task_limit: int,
|
|
||||||
download_limit: int,
|
|
||||||
task_delay: float
|
|
||||||
):
|
|
||||||
if task_limit <= 0:
|
|
||||||
raise ValueError("task limit must be at least 1")
|
|
||||||
if download_limit <= 0:
|
|
||||||
raise ValueError("download limit must be at least 1")
|
|
||||||
if download_limit > task_limit:
|
|
||||||
raise ValueError("download limit can't be greater than task limit")
|
|
||||||
if task_delay < 0:
|
|
||||||
raise ValueError("Task delay must not be negative")
|
|
||||||
|
|
||||||
self._slots = [Slot() for _ in range(task_limit)]
|
|
||||||
self._downloads = download_limit
|
|
||||||
self._delay = task_delay
|
|
||||||
|
|
||||||
self._condition = asyncio.Condition()
|
|
||||||
|
|
||||||
def _acquire_slot(self) -> Optional[Slot]:
|
|
||||||
for slot in self._slots:
|
|
||||||
if not slot.active:
|
|
||||||
slot.active = True
|
|
||||||
return slot
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def _wait_for_slot_delay(self, slot: Slot) -> None:
|
|
||||||
if slot.last_left is not None:
|
|
||||||
delay = slot.last_left + self._delay - time.time()
|
|
||||||
if delay > 0:
|
|
||||||
await asyncio.sleep(delay)
|
|
||||||
|
|
||||||
def _release_slot(self, slot: Slot) -> None:
|
|
||||||
slot.last_left = time.time()
|
|
||||||
slot.active = False
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def limit_crawl(self) -> AsyncIterator[None]:
|
|
||||||
slot: Slot
|
|
||||||
async with self._condition:
|
|
||||||
while True:
|
|
||||||
if found_slot := self._acquire_slot():
|
|
||||||
slot = found_slot
|
|
||||||
break
|
|
||||||
await self._condition.wait()
|
|
||||||
|
|
||||||
await self._wait_for_slot_delay(slot)
|
|
||||||
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
finally:
|
|
||||||
async with self._condition:
|
|
||||||
self._release_slot(slot)
|
|
||||||
self._condition.notify_all()
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def limit_download(self) -> AsyncIterator[None]:
|
|
||||||
slot: Slot
|
|
||||||
async with self._condition:
|
|
||||||
while True:
|
|
||||||
if self._downloads <= 0:
|
|
||||||
await self._condition.wait()
|
|
||||||
continue
|
|
||||||
|
|
||||||
if found_slot := self._acquire_slot():
|
|
||||||
slot = found_slot
|
|
||||||
self._downloads -= 1
|
|
||||||
break
|
|
||||||
|
|
||||||
await self._condition.wait()
|
|
||||||
|
|
||||||
await self._wait_for_slot_delay(slot)
|
|
||||||
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
finally:
|
|
||||||
async with self._condition:
|
|
||||||
self._release_slot(slot)
|
|
||||||
self._downloads += 1
|
|
||||||
self._condition.notify_all()
|
|
Loading…
Reference in New Issue
Block a user