Replace asyncio.gather with custom Crawler function

This commit is contained in:
Joscha 2021-05-23 17:25:16 +02:00
parent c0cecf8363
commit 29d5a40c57
3 changed files with 24 additions and 5 deletions

View File

@ -1,7 +1,8 @@
import asyncio
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path, PurePath
from typing import Any, Awaitable, Callable, Dict, Optional, Tuple, TypeVar
from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence, Tuple, TypeVar
from rich.markup import escape
@ -228,6 +229,25 @@ class Crawler(ABC):
section.on_conflict(),
)
@staticmethod
async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]:
"""
Similar to asyncio.gather. However, in the case of an exception, all
still running tasks are cancelled and the exception is rethrown.
This should always be preferred over asyncio.gather in crawler code so
that an exception like CrawlError may actually stop the crawler.
"""
tasks = [asyncio.ensure_future(aw) for aw in awaitables]
result = asyncio.gather(*tasks)
try:
return await result
except: # noqa: E722
for task in tasks:
task.cancel()
raise
async def crawl(self, path: PurePath) -> Optional[CrawlToken]:
log.explain_topic(f"Decision: Crawl {fmt_path(path)}")

View File

@ -1,4 +1,3 @@
import asyncio
import re
from pathlib import PurePath
from typing import Any, Awaitable, Callable, Dict, Optional, Set, TypeVar, Union
@ -215,7 +214,7 @@ class KitIliasWebCrawler(HttpCrawler):
# this method without having spawned a single task. Due to this we do
# not need to cancel anything or worry about this gather call or the forks
# further up.
await asyncio.gather(*tasks)
await self.gather(tasks)
await impl()
@ -240,7 +239,7 @@ class KitIliasWebCrawler(HttpCrawler):
# this method without having spawned a single task. Due to this we do
# not need to cancel anything or worry about this gather call or the forks
# further up.
await asyncio.gather(*tasks)
await self.gather(tasks)
await impl()

View File

@ -83,7 +83,7 @@ class LocalCrawler(Crawler):
pure_child = pure / child.name
tasks.append(self._crawl_path(child, pure_child))
await asyncio.gather(*tasks)
await self.gather(tasks)
async def _crawl_file(self, path: Path, pure: PurePath) -> None:
stat = path.stat()