From 29d5a40c570ac21b7bd73fee64134e6c79216301 Mon Sep 17 00:00:00 2001 From: Joscha Date: Sun, 23 May 2021 17:25:16 +0200 Subject: [PATCH] Replace asyncio.gather with custom Crawler function --- PFERD/crawler.py | 22 ++++++++++++++++++- PFERD/crawlers/ilias/kit_ilias_web_crawler.py | 5 ++--- PFERD/crawlers/local.py | 2 +- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/PFERD/crawler.py b/PFERD/crawler.py index 749510c..e3aef8f 100644 --- a/PFERD/crawler.py +++ b/PFERD/crawler.py @@ -1,7 +1,8 @@ +import asyncio from abc import ABC, abstractmethod from datetime import datetime from pathlib import Path, PurePath -from typing import Any, Awaitable, Callable, Dict, Optional, Tuple, TypeVar +from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence, Tuple, TypeVar from rich.markup import escape @@ -228,6 +229,25 @@ class Crawler(ABC): section.on_conflict(), ) + @staticmethod + async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]: + """ + Similar to asyncio.gather. However, in the case of an exception, all + still running tasks are cancelled and the exception is rethrown. + + This should always be preferred over asyncio.gather in crawler code so + that an exception like CrawlError may actually stop the crawler. + """ + + tasks = [asyncio.ensure_future(aw) for aw in awaitables] + result = asyncio.gather(*tasks) + try: + return await result + except: # noqa: E722 + for task in tasks: + task.cancel() + raise + async def crawl(self, path: PurePath) -> Optional[CrawlToken]: log.explain_topic(f"Decision: Crawl {fmt_path(path)}") diff --git a/PFERD/crawlers/ilias/kit_ilias_web_crawler.py b/PFERD/crawlers/ilias/kit_ilias_web_crawler.py index 9094a7b..597ea17 100644 --- a/PFERD/crawlers/ilias/kit_ilias_web_crawler.py +++ b/PFERD/crawlers/ilias/kit_ilias_web_crawler.py @@ -1,4 +1,3 @@ -import asyncio import re from pathlib import PurePath from typing import Any, Awaitable, Callable, Dict, Optional, Set, TypeVar, Union @@ -215,7 +214,7 @@ class KitIliasWebCrawler(HttpCrawler): # this method without having spawned a single task. Due to this we do # not need to cancel anything or worry about this gather call or the forks # further up. - await asyncio.gather(*tasks) + await self.gather(tasks) await impl() @@ -240,7 +239,7 @@ class KitIliasWebCrawler(HttpCrawler): # this method without having spawned a single task. Due to this we do # not need to cancel anything or worry about this gather call or the forks # further up. - await asyncio.gather(*tasks) + await self.gather(tasks) await impl() diff --git a/PFERD/crawlers/local.py b/PFERD/crawlers/local.py index 176f36d..35e5829 100644 --- a/PFERD/crawlers/local.py +++ b/PFERD/crawlers/local.py @@ -83,7 +83,7 @@ class LocalCrawler(Crawler): pure_child = pure / child.name tasks.append(self._crawl_path(child, pure_child)) - await asyncio.gather(*tasks) + await self.gather(tasks) async def _crawl_file(self, path: Path, pure: PurePath) -> None: stat = path.stat()