mirror of
https://github.com/Garmelon/PFERD.git
synced 2023-12-21 10:23:01 +01:00
Use PathLike everywhere
This commit is contained in:
parent
7f53543324
commit
1aaa6e7ab5
@ -14,7 +14,7 @@ from .location import Location
|
||||
from .organizer import Organizer
|
||||
from .tmp_dir import TmpDir
|
||||
from .transform import TF, Transform, apply_transform
|
||||
from .utils import PrettyLogger
|
||||
from .utils import PrettyLogger, PathLike, to_path
|
||||
|
||||
# TODO save known-good cookies as soon as possible
|
||||
|
||||
@ -50,20 +50,20 @@ class Pferd(Location):
|
||||
|
||||
def _ilias(
|
||||
self,
|
||||
target: Union[Path, str],
|
||||
target: PathLike,
|
||||
base_url: str,
|
||||
course_id: str,
|
||||
authenticator: IliasAuthenticator,
|
||||
cookies: Optional[Path],
|
||||
cookies: Optional[PathLike],
|
||||
dir_filter: IliasDirectoryFilter,
|
||||
transform: Transform,
|
||||
download_strategy: IliasDownloadStrategy,
|
||||
) -> None:
|
||||
# pylint: disable=too-many-locals
|
||||
cookie_jar = CookieJar(cookies)
|
||||
cookie_jar = CookieJar(to_path(cookies) if cookies else None)
|
||||
session = cookie_jar.create_session()
|
||||
tmp_dir = self._tmp_dir.new_subdir()
|
||||
organizer = Organizer(self.resolve(Path(target)))
|
||||
organizer = Organizer(self.resolve(to_path(target)))
|
||||
|
||||
crawler = IliasCrawler(base_url, course_id, session, authenticator, dir_filter)
|
||||
downloader = IliasDownloader(tmp_dir, organizer, session, authenticator, download_strategy)
|
||||
@ -83,11 +83,11 @@ class Pferd(Location):
|
||||
|
||||
def ilias_kit(
|
||||
self,
|
||||
target: Union[Path, str],
|
||||
target: PathLike,
|
||||
course_id: str,
|
||||
dir_filter: IliasDirectoryFilter = lambda x: True,
|
||||
transform: Transform = lambda x: x,
|
||||
cookies: Optional[Path] = None,
|
||||
cookies: Optional[PathLike] = None,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
download_strategy: IliasDownloadStrategy = download_modified_or_new,
|
||||
|
@ -5,10 +5,11 @@ only files whose names match a regex, or renaming files from one numbering
|
||||
scheme to another.
|
||||
"""
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from pathlib import PurePath
|
||||
from typing import Callable, List, Optional, Tuple, TypeVar, Union
|
||||
from typing import Callable, List, Optional, TypeVar
|
||||
|
||||
from .utils import PathLike, Regex, to_path, to_pattern
|
||||
|
||||
Transform = Callable[[PurePath], Optional[PurePath]]
|
||||
|
||||
@ -41,22 +42,6 @@ def apply_transform(
|
||||
result.append(transformable)
|
||||
return result
|
||||
|
||||
# Utility types and functions
|
||||
|
||||
PathLike = Union[PurePath, str, Tuple[str, ...]]
|
||||
|
||||
def _path(pathlike: PathLike) -> PurePath:
|
||||
if isinstance(pathlike, tuple):
|
||||
return PurePath(*pathlike)
|
||||
return PurePath(pathlike)
|
||||
|
||||
Regex = Union[str, re.Pattern]
|
||||
|
||||
def _pattern(regex: Regex) -> re.Pattern:
|
||||
if isinstance(regex, re.Pattern):
|
||||
return regex
|
||||
return re.compile(regex)
|
||||
|
||||
# Transform combinators
|
||||
|
||||
keep = lambda path: path
|
||||
@ -94,8 +79,8 @@ def glob(pattern: str) -> Transform:
|
||||
return predicate(lambda path: path.match(pattern))
|
||||
|
||||
def move_dir(source_dir: PathLike, target_dir: PathLike) -> Transform:
|
||||
source_path = _path(source_dir)
|
||||
target_path = _path(target_dir)
|
||||
source_path = to_path(source_dir)
|
||||
target_path = to_path(target_dir)
|
||||
def inner(path: PurePath) -> Optional[PurePath]:
|
||||
if source_path in path.parents:
|
||||
return target_path / path.relative_to(source_path)
|
||||
@ -103,8 +88,8 @@ def move_dir(source_dir: PathLike, target_dir: PathLike) -> Transform:
|
||||
return inner
|
||||
|
||||
def move(source: PathLike, target: PathLike) -> Transform:
|
||||
source_path = _path(source)
|
||||
target_path = _path(target)
|
||||
source_path = to_path(source)
|
||||
target_path = to_path(target)
|
||||
def inner(path: PurePath) -> Optional[PurePath]:
|
||||
if path == source_path:
|
||||
return target_path
|
||||
@ -120,7 +105,7 @@ def rename(source: str, target: str) -> Transform:
|
||||
|
||||
def re_move(regex: Regex, target: str) -> Transform:
|
||||
def inner(path: PurePath) -> Optional[PurePath]:
|
||||
if match := _pattern(regex).fullmatch(str(path)):
|
||||
if match := to_pattern(regex).fullmatch(str(path)):
|
||||
groups = [match.group(0)]
|
||||
groups.extend(match.groups())
|
||||
return PurePath(target.format(*groups))
|
||||
@ -129,54 +114,9 @@ def re_move(regex: Regex, target: str) -> Transform:
|
||||
|
||||
def re_rename(regex: Regex, target: str) -> Transform:
|
||||
def inner(path: PurePath) -> Optional[PurePath]:
|
||||
if match := _pattern(regex).fullmatch(path.name):
|
||||
if match := to_pattern(regex).fullmatch(path.name):
|
||||
groups = [match.group(0)]
|
||||
groups.extend(match.groups())
|
||||
return path.with_name(target.format(*groups))
|
||||
return None
|
||||
return inner
|
||||
|
||||
|
||||
# def match(regex: Union[str, re.Pattern]) -> Transform:
|
||||
# pattern: re.Pattern
|
||||
# if isinstance(regex, str):
|
||||
# pattern = re.compile(regex)
|
||||
# else:
|
||||
# pattern = regex
|
||||
|
||||
# return predicate(lambda path: bool(pattern.match(path.name)))
|
||||
|
||||
# def full_match(regex: Union[str, re.Pattern]) -> Transform:
|
||||
# pattern: re.Pattern
|
||||
# if isinstance(regex, str):
|
||||
# pattern = re.compile(regex)
|
||||
# else:
|
||||
# pattern = regex
|
||||
|
||||
# return predicate(lambda path: bool(pattern.match(str(path))))
|
||||
|
||||
# def zoom(
|
||||
# selector: Callable[[PurePath], Optional[Tuple[PurePath, PurePath]]],
|
||||
# actor: Callable[[PurePath], Transform],
|
||||
# ) -> Transform:
|
||||
# def inner(path: PurePath) -> Optional[PurePath]:
|
||||
# if selected := selector(path):
|
||||
# base, relative = selected
|
||||
# return actor(base)(relative)
|
||||
# return None
|
||||
# return inner
|
||||
|
||||
# def move_from(source: PurePath, target: PurePath) -> Transform:
|
||||
# return zoom(
|
||||
# lambda path: (source, path.relative_to(source)) if source in path.parents else None,
|
||||
# lambda _: lambda path: target / path,
|
||||
# )
|
||||
|
||||
# re_move(r"Übungsmaterial/Blätter/(\d+).pdf", "Blätter/Blatt{1:02}.pdf")
|
||||
# re_rename(r"(\d+).pdf", "Blatt{1:02}.pdf")
|
||||
|
||||
# def at(at_path: PurePath) -> Transform:
|
||||
# return predicate(lambda path: path == at_path)
|
||||
|
||||
# def inside(inside_path: PurePath) -> Transform:
|
||||
# return predicate(lambda path: inside_path in path.parents)
|
||||
|
@ -3,6 +3,7 @@ A few utility bobs and bits.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from pathlib import Path, PurePath
|
||||
from typing import Optional, Tuple, Union
|
||||
|
||||
@ -10,25 +11,22 @@ import bs4
|
||||
import requests
|
||||
from colorama import Fore, Style
|
||||
|
||||
|
||||
def move(path: PurePath, from_folders: Tuple[str], to_folders: Tuple[str]) -> Optional[PurePath]:
|
||||
"""
|
||||
If the input path is located anywhere within from_folders, replace the
|
||||
from_folders with to_folders. Returns None otherwise.
|
||||
"""
|
||||
|
||||
length = len(from_folders)
|
||||
if path.parts[:length] == from_folders:
|
||||
return PurePath(*to_folders, *path.parts[length:])
|
||||
return None
|
||||
PathLike = Union[PurePath, str, Tuple[str, ...]]
|
||||
|
||||
|
||||
def rename(path: PurePath, to_name: str) -> PurePath:
|
||||
"""
|
||||
Set the file name of the input path to to_name.
|
||||
"""
|
||||
def to_path(pathlike: PathLike) -> Path:
|
||||
if isinstance(pathlike, tuple):
|
||||
return Path(*pathlike)
|
||||
return Path(pathlike)
|
||||
|
||||
return PurePath(*path.parts[:-1], to_name)
|
||||
|
||||
Regex = Union[str, re.Pattern]
|
||||
|
||||
|
||||
def to_pattern(regex: Regex) -> re.Pattern:
|
||||
if isinstance(regex, re.Pattern):
|
||||
return regex
|
||||
return re.compile(regex)
|
||||
|
||||
|
||||
def soupify(response: requests.Response) -> bs4.BeautifulSoup:
|
||||
@ -39,7 +37,7 @@ def soupify(response: requests.Response) -> bs4.BeautifulSoup:
|
||||
return bs4.BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
|
||||
def stream_to_path(response: requests.Response, to_path: Path, chunk_size: int = 1024 ** 2) -> None:
|
||||
def stream_to_path(response: requests.Response, target: Path, chunk_size: int = 1024 ** 2) -> None:
|
||||
"""
|
||||
Download a requests response content to a file by streaming it. This
|
||||
function avoids excessive memory usage when downloading large files. The
|
||||
@ -47,7 +45,7 @@ def stream_to_path(response: requests.Response, to_path: Path, chunk_size: int =
|
||||
"""
|
||||
|
||||
with response:
|
||||
with open(to_path, 'wb') as file_descriptor:
|
||||
with open(target, 'wb') as file_descriptor:
|
||||
for chunk in response.iter_content(chunk_size=chunk_size):
|
||||
file_descriptor.write(chunk)
|
||||
|
||||
@ -86,54 +84,58 @@ class PrettyLogger:
|
||||
def __init__(self, logger: logging.Logger) -> None:
|
||||
self.logger = logger
|
||||
|
||||
def modified_file(self, path: PurePath) -> None:
|
||||
@staticmethod
|
||||
def _format_path(path: PathLike) -> str:
|
||||
return repr(str(to_path(path)))
|
||||
|
||||
def modified_file(self, path: PathLike) -> None:
|
||||
"""
|
||||
An existing file has changed.
|
||||
"""
|
||||
|
||||
self.logger.info(
|
||||
f"{Fore.MAGENTA}{Style.BRIGHT}Modified {str(path)!r}.{Style.RESET_ALL}"
|
||||
f"{Fore.MAGENTA}{Style.BRIGHT}Modified {self._format_path(path)}.{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
def new_file(self, path: PurePath) -> None:
|
||||
def new_file(self, path: PathLike) -> None:
|
||||
"""
|
||||
A new file has been downloaded.
|
||||
"""
|
||||
|
||||
self.logger.info(
|
||||
f"{Fore.GREEN}{Style.BRIGHT}Created {str(path)!r}.{Style.RESET_ALL}"
|
||||
f"{Fore.GREEN}{Style.BRIGHT}Created {self._format_path(path)}.{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
def ignored_file(self, path: PurePath, reason: str) -> None:
|
||||
def ignored_file(self, path: PathLike, reason: str) -> None:
|
||||
"""
|
||||
File was not downloaded or modified.
|
||||
"""
|
||||
|
||||
self.logger.info(
|
||||
f"{Style.DIM}Ignored {str(path)!r} "
|
||||
f"{Style.DIM}Ignored {self._format_path(path)} "
|
||||
f"({Style.NORMAL}{reason}{Style.DIM}).{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
def searching(self, path: PurePath) -> None:
|
||||
def searching(self, path: PathLike) -> None:
|
||||
"""
|
||||
A crawler searches a particular object.
|
||||
"""
|
||||
|
||||
self.logger.info(f"Searching {str(path)!r}")
|
||||
self.logger.info(f"Searching {self._format_path(path)}")
|
||||
|
||||
def not_searching(self, path: PurePath, reason: str) -> None:
|
||||
def not_searching(self, path: PathLike, reason: str) -> None:
|
||||
"""
|
||||
A crawler does not search a particular object.
|
||||
"""
|
||||
|
||||
self.logger.info(
|
||||
f"{Style.DIM}Not searching {str(path)!r} "
|
||||
f"{Style.DIM}Not searching {self._format_path(path)} "
|
||||
f"({Style.NORMAL}{reason}{Style.DIM}).{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
def starting_synchronizer(
|
||||
self,
|
||||
target_directory: Union[Path, str],
|
||||
target_directory: PathLike,
|
||||
synchronizer_name: str,
|
||||
subject: Optional[str] = None,
|
||||
) -> None:
|
||||
@ -144,6 +146,7 @@ class PrettyLogger:
|
||||
subject_str = f"{subject} " if subject else ""
|
||||
self.logger.info("")
|
||||
self.logger.info((
|
||||
f"{Fore.CYAN}{Style.BRIGHT}Synchronizing {subject_str}to {str(target_directory)!r}"
|
||||
f" using the {synchronizer_name} synchronizer.{Style.RESET_ALL}"
|
||||
f"{Fore.CYAN}{Style.BRIGHT}Synchronizing "
|
||||
f"{subject_str}to {self._format_path(target_directory)} "
|
||||
f"using the {synchronizer_name} synchronizer.{Style.RESET_ALL}"
|
||||
))
|
||||
|
Loading…
Reference in New Issue
Block a user