From 6584d6a905b0f9f679c1c88d6c384c4ee4e39c45 Mon Sep 17 00:00:00 2001 From: I-Al-Istannen Date: Mon, 20 Apr 2020 15:39:38 +0200 Subject: [PATCH] Elaborate accept_file in new_organizer --- PFERD/new_organizer.py | 79 +++++++++++++++++++++++++++++++++--------- PFERD/utils.py | 23 ++++++++++++ 2 files changed, 85 insertions(+), 17 deletions(-) diff --git a/PFERD/new_organizer.py b/PFERD/new_organizer.py index 27e7a7d..e67646c 100644 --- a/PFERD/new_organizer.py +++ b/PFERD/new_organizer.py @@ -3,14 +3,24 @@ A organizer is bound to a single directory. """ +import filecmp import logging import shutil from pathlib import Path from typing import List, Set -from .utils import prompt_yes_no +from .utils import PrettyLogger, prompt_yes_no, resolve_path logger = logging.getLogger(__name__) +pretty_logger = PrettyLogger(logger) + + +class FileAcceptException(Exception): + """An exception while accepting a file.""" + + def __init__(self, message: str): + """Create a new exception.""" + super().__init__(message) class Organizer(): @@ -20,35 +30,70 @@ class Organizer(): """Create a new organizer for a given path.""" self._path = path self._known_files: Set[Path] = set() + # Keep the root dir + self.mark(path) @property def path(self) -> Path: """Return the path for this organizer.""" return self._path - # TODO: Name this method :/ move_from? add_file? new_file? - def accept_file(self, source: Path, target: Path) -> None: + def resolve(self, target_file: Path) -> Path: + """Resolve a file relative to the path of this organizer. + + Raises an exception if the file is outside the given directory. + """ + return resolve_path(self.path, target_file) + + def accept_file(self, src: Path, dst: Path) -> None: """Move a file to this organizer and mark it.""" - source_absolute = self.path.joinpath(source).absolute() - target_absolute = self.path.joinpath(target).absolute() + src_absolute = src.resolve() + dst_absolute = self.resolve(dst) - logger.debug(f"Copying '{source_absolute}' to '{target_absolute}") + if not src_absolute.exists(): + raise FileAcceptException("Source file does not exist") - shutil.move(str(source_absolute), str(target_absolute)) + if not src_absolute.is_file(): + raise FileAcceptException("Source is a directory") - self.mark_file(target) + logger.debug(f"Copying '{src_absolute}' to '{dst_absolute}") - # TODO: Name this method :/ track_file? - def mark_file(self, path: Path) -> None: + # Destination file is directory + if dst_absolute.exists() and dst_absolute.is_dir(): + if prompt_yes_no(f"Overwrite folder {dst_absolute} with file?", default=False): + shutil.rmtree(dst_absolute) + else: + logger.warn(f"Could not add file {dst_absolute}") + return + + # Destination file exists + if dst_absolute.exists() and dst_absolute.is_file(): + if filecmp.cmp(str(src_absolute), str(dst_absolute), shallow=False): + pretty_logger.ignored_file(dst_absolute) + + # Bail out, nothing more to do + self.mark(dst) + return + else: + pretty_logger.modified_file(dst_absolute) + else: + pretty_logger.new_file(dst_absolute) + + # Create parent dir if needed + dst_parent_dir: Path = dst_absolute.parent + dst_parent_dir.mkdir(exist_ok=True, parents=True) + + # Move file + shutil.move(str(src_absolute), str(dst_absolute)) + + self.mark(dst) + + def mark(self, path: Path) -> None: """Mark a file as used so it will not get cleaned up.""" - absolute_path = self.path.joinpath(path).absolute() + absolute_path = self.path.joinpath(path).resolve() self._known_files.add(absolute_path) logger.debug(f"Tracked {absolute_path}") - def resolve_file(self, file_path: Path) -> Path: - """Resolve a file relative to the path of this organizer.""" - return self.path.joinpath(file_path) - def cleanup(self) -> None: """Remove all untracked files in the organizer's dir.""" logger.debug("Deleting all untracked files...") @@ -63,12 +108,12 @@ class Organizer(): if path.is_dir(): self._cleanup(path) else: - if path.absolute() not in self._known_files: + if path.resolve() not in self._known_files: self._delete_file_if_confirmed(path) # Delete dir if it was empty and untracked dir_empty = len(list(start_dir.iterdir())) == 0 - if start_dir.absolute() not in self._known_files and dir_empty: + if start_dir.resolve() not in self._known_files and dir_empty: start_dir.rmdir() def _delete_file_if_confirmed(self, path: Path) -> None: diff --git a/PFERD/utils.py b/PFERD/utils.py index 43bdd33..49cf31b 100644 --- a/PFERD/utils.py +++ b/PFERD/utils.py @@ -52,6 +52,29 @@ def prompt_yes_no(question: str, default: Optional[bool] = None) -> bool: print(WRONG_REPLY) +class ResolveException(Exception): + """An exception while resolving a file.""" + + def __init__(self, message: str): + """Create a new exception.""" + super().__init__(message) + + +def resolve_path(directory: Path, target_file: Path) -> Path: + """Resolve a file relative to the path of this organizer. + + Raises a [ResolveException] if the file is outside the given directory. + """ + absolute_path = directory.joinpath(target_file).resolve() + + if not str(absolute_path).startswith(str(directory.resolve())): + raise ResolveException( + f"Path resolved to file outside folder ({absolute_path})" + ) + + return absolute_path + + class PrettyLogger: def __init__(self, logger: logging.Logger) -> None: