From ed9245c14df38b9879735a9d9e5646501a4fd26a Mon Sep 17 00:00:00 2001 From: I-Al-Istannen Date: Mon, 20 Apr 2020 18:50:23 +0200 Subject: [PATCH] Remove old organizer --- PFERD/http_downloader.py | 2 +- PFERD/ilias/downloader.py | 2 +- PFERD/new_organizer.py | 123 ---------------------- PFERD/organizer.py | 214 +++++++++++++++++--------------------- 4 files changed, 95 insertions(+), 246 deletions(-) delete mode 100644 PFERD/new_organizer.py diff --git a/PFERD/http_downloader.py b/PFERD/http_downloader.py index 3c5d531..6d68c45 100644 --- a/PFERD/http_downloader.py +++ b/PFERD/http_downloader.py @@ -7,7 +7,7 @@ from typing import Any, Dict, Optional import requests import requests.auth -from .new_organizer import Organizer +from .organizer import Organizer from .tmp_dir import TmpDir from .utils import stream_to_path diff --git a/PFERD/ilias/downloader.py b/PFERD/ilias/downloader.py index 5f1aefe..bfd6f84 100644 --- a/PFERD/ilias/downloader.py +++ b/PFERD/ilias/downloader.py @@ -6,7 +6,7 @@ from typing import Any, Dict import bs4 import requests -from ..new_organizer import Organizer +from ..organizer import Organizer from ..tmp_dir import TmpDir from ..utils import soupify, stream_to_path from .authenticators import IliasAuthenticator diff --git a/PFERD/new_organizer.py b/PFERD/new_organizer.py deleted file mode 100644 index e67646c..0000000 --- a/PFERD/new_organizer.py +++ /dev/null @@ -1,123 +0,0 @@ -"""A simple helper for managing downloaded files. - -A organizer is bound to a single directory. -""" - -import filecmp -import logging -import shutil -from pathlib import Path -from typing import List, Set - -from .utils import PrettyLogger, prompt_yes_no, resolve_path - -logger = logging.getLogger(__name__) -pretty_logger = PrettyLogger(logger) - - -class FileAcceptException(Exception): - """An exception while accepting a file.""" - - def __init__(self, message: str): - """Create a new exception.""" - super().__init__(message) - - -class Organizer(): - """A helper for managing downloaded files.""" - - def __init__(self, path: Path): - """Create a new organizer for a given path.""" - self._path = path - self._known_files: Set[Path] = set() - # Keep the root dir - self.mark(path) - - @property - def path(self) -> Path: - """Return the path for this organizer.""" - return self._path - - def resolve(self, target_file: Path) -> Path: - """Resolve a file relative to the path of this organizer. - - Raises an exception if the file is outside the given directory. - """ - return resolve_path(self.path, target_file) - - def accept_file(self, src: Path, dst: Path) -> None: - """Move a file to this organizer and mark it.""" - src_absolute = src.resolve() - dst_absolute = self.resolve(dst) - - if not src_absolute.exists(): - raise FileAcceptException("Source file does not exist") - - if not src_absolute.is_file(): - raise FileAcceptException("Source is a directory") - - logger.debug(f"Copying '{src_absolute}' to '{dst_absolute}") - - # Destination file is directory - if dst_absolute.exists() and dst_absolute.is_dir(): - if prompt_yes_no(f"Overwrite folder {dst_absolute} with file?", default=False): - shutil.rmtree(dst_absolute) - else: - logger.warn(f"Could not add file {dst_absolute}") - return - - # Destination file exists - if dst_absolute.exists() and dst_absolute.is_file(): - if filecmp.cmp(str(src_absolute), str(dst_absolute), shallow=False): - pretty_logger.ignored_file(dst_absolute) - - # Bail out, nothing more to do - self.mark(dst) - return - else: - pretty_logger.modified_file(dst_absolute) - else: - pretty_logger.new_file(dst_absolute) - - # Create parent dir if needed - dst_parent_dir: Path = dst_absolute.parent - dst_parent_dir.mkdir(exist_ok=True, parents=True) - - # Move file - shutil.move(str(src_absolute), str(dst_absolute)) - - self.mark(dst) - - def mark(self, path: Path) -> None: - """Mark a file as used so it will not get cleaned up.""" - absolute_path = self.path.joinpath(path).resolve() - self._known_files.add(absolute_path) - logger.debug(f"Tracked {absolute_path}") - - def cleanup(self) -> None: - """Remove all untracked files in the organizer's dir.""" - logger.debug("Deleting all untracked files...") - - self._cleanup(self.path) - - def _cleanup(self, start_dir: Path) -> None: - paths: List[Path] = list(start_dir.iterdir()) - - # Recursively clean paths - for path in paths: - if path.is_dir(): - self._cleanup(path) - else: - if path.resolve() not in self._known_files: - self._delete_file_if_confirmed(path) - - # Delete dir if it was empty and untracked - dir_empty = len(list(start_dir.iterdir())) == 0 - if start_dir.resolve() not in self._known_files and dir_empty: - start_dir.rmdir() - - def _delete_file_if_confirmed(self, path: Path) -> None: - prompt = f"Do you want to delete {path}" - - if prompt_yes_no(prompt, False): - path.unlink() diff --git a/PFERD/organizer.py b/PFERD/organizer.py index 38c58d2..e67646c 100644 --- a/PFERD/organizer.py +++ b/PFERD/organizer.py @@ -1,151 +1,123 @@ +"""A simple helper for managing downloaded files. + +A organizer is bound to a single directory. +""" + import filecmp import logging -import pathlib import shutil +from pathlib import Path +from typing import List, Set -from . import utils +from .utils import PrettyLogger, prompt_yes_no, resolve_path -__all__ = ["Organizer"] logger = logging.getLogger(__name__) -pretty = utils.PrettyLogger(logger) +pretty_logger = PrettyLogger(logger) -class Organizer: - def __init__(self, base_dir, sync_dir): - """ - base_dir - the .tmp directory will be created here - sync_dir - synced files will be moved here - Both are expected to be concrete pathlib paths. + +class FileAcceptException(Exception): + """An exception while accepting a file.""" + + def __init__(self, message: str): + """Create a new exception.""" + super().__init__(message) + + +class Organizer(): + """A helper for managing downloaded files.""" + + def __init__(self, path: Path): + """Create a new organizer for a given path.""" + self._path = path + self._known_files: Set[Path] = set() + # Keep the root dir + self.mark(path) + + @property + def path(self) -> Path: + """Return the path for this organizer.""" + return self._path + + def resolve(self, target_file: Path) -> Path: + """Resolve a file relative to the path of this organizer. + + Raises an exception if the file is outside the given directory. """ + return resolve_path(self.path, target_file) - self._base_dir = base_dir - self._sync_dir = sync_dir + def accept_file(self, src: Path, dst: Path) -> None: + """Move a file to this organizer and mark it.""" + src_absolute = src.resolve() + dst_absolute = self.resolve(dst) - self._temp_dir = pathlib.Path(self._base_dir, ".tmp") - self._temp_nr = 0 + if not src_absolute.exists(): + raise FileAcceptException("Source file does not exist") - # check if base/sync dir exist? + if not src_absolute.is_file(): + raise FileAcceptException("Source is a directory") - self._added_files = set() + logger.debug(f"Copying '{src_absolute}' to '{dst_absolute}") - def clean_temp_dir(self): - if self._temp_dir.exists(): - shutil.rmtree(self._temp_dir) - self._temp_dir.mkdir(exist_ok=True) - logger.debug(f"Cleaned temp dir: {self._temp_dir}") - - def temp_dir(self): - nr = self._temp_nr - self._temp_nr += 1 - temp_dir = pathlib.Path(self._temp_dir, f"{nr:08}").resolve() - logger.debug(f"Produced new temp dir: {temp_dir}") - return temp_dir - - def temp_file(self): - # generate the path to a new temp file in base_path/.tmp/ - # make sure no two paths are the same - nr = self._temp_nr - self._temp_nr += 1 - temp_file = pathlib.Path(self._temp_dir, f"{nr:08}.tmp").resolve() - logger.debug(f"Produced new temp file: {temp_file}") - return temp_file - - def add_file(self, from_path, to_path): - if not from_path.exists(): - raise utils.FileNotFoundException(f"Could not add file at {from_path}") - - # check if sync_dir/to_path is inside sync_dir? - to_path = pathlib.Path(self._sync_dir, to_path) - - if to_path.exists() and to_path.is_dir(): - if self._prompt_yes_no(f"Overwrite folder {to_path} with file?", default=False): - shutil.rmtree(to_path) + # Destination file is directory + if dst_absolute.exists() and dst_absolute.is_dir(): + if prompt_yes_no(f"Overwrite folder {dst_absolute} with file?", default=False): + shutil.rmtree(dst_absolute) else: - logger.warn(f"Could not add file {to_path}") + logger.warn(f"Could not add file {dst_absolute}") return - if to_path.exists(): - if filecmp.cmp(from_path, to_path, shallow=False): - pretty.ignored_file(to_path) + # Destination file exists + if dst_absolute.exists() and dst_absolute.is_file(): + if filecmp.cmp(str(src_absolute), str(dst_absolute), shallow=False): + pretty_logger.ignored_file(dst_absolute) - # remember path for later reference - self._added_files.add(to_path.resolve()) - logger.debug(f"Added file {to_path.resolve()}") - - # No further action needed, especially not overwriting symlinks... + # Bail out, nothing more to do + self.mark(dst) return else: - pretty.modified_file(to_path) + pretty_logger.modified_file(dst_absolute) else: - pretty.new_file(to_path) + pretty_logger.new_file(dst_absolute) - # copy the file from from_path to sync_dir/to_path - # If the file being replaced was a symlink, the link itself is overwritten, - # not the file the link points to. - to_path.parent.mkdir(parents=True, exist_ok=True) - from_path.replace(to_path) - logger.debug(f"Moved {from_path} to {to_path}") + # Create parent dir if needed + dst_parent_dir: Path = dst_absolute.parent + dst_parent_dir.mkdir(exist_ok=True, parents=True) - # remember path for later reference, after the new file was written - # This is necessary here because otherwise, resolve() would resolve the symlink too. - self._added_files.add(to_path.resolve()) - logger.debug(f"Added file {to_path.resolve()}") + # Move file + shutil.move(str(src_absolute), str(dst_absolute)) - def clean_sync_dir(self): - self._clean_dir(self._sync_dir, remove_parent=False) - logger.debug(f"Cleaned sync dir: {self._sync_dir}") + self.mark(dst) - def _clean_dir(self, path, remove_parent=True): - for child in sorted(path.iterdir()): - logger.debug(f"Looking at {child.resolve()}") - if child.is_dir(): - self._clean_dir(child, remove_parent=True) - elif child.resolve() not in self._added_files: - if self._prompt_yes_no(f"Delete {child}?", default=False): - child.unlink() - logger.debug(f"Deleted {child}") + def mark(self, path: Path) -> None: + """Mark a file as used so it will not get cleaned up.""" + absolute_path = self.path.joinpath(path).resolve() + self._known_files.add(absolute_path) + logger.debug(f"Tracked {absolute_path}") - if remove_parent: - try: - path.rmdir() - except OSError: # directory not empty - pass + def cleanup(self) -> None: + """Remove all untracked files in the organizer's dir.""" + logger.debug("Deleting all untracked files...") - def _prompt_yes_no(self, question, default=None): - if default is True: - prompt = "[Y/n]" - elif default is False: - prompt = "[y/N]" - else: - prompt = "[y/n]" + self._cleanup(self.path) - text = f"{question} {prompt} " - WRONG_REPLY = "Please reply with 'yes'/'y' or 'no'/'n'." + def _cleanup(self, start_dir: Path) -> None: + paths: List[Path] = list(start_dir.iterdir()) - while True: - response = input(text).strip().lower() - if response in {"yes", "ye", "y"}: - return True - elif response in {"no", "n"}: - return False - elif response == "": - if default is None: - print(WRONG_REPLY) - else: - return default + # Recursively clean paths + for path in paths: + if path.is_dir(): + self._cleanup(path) else: - print(WRONG_REPLY) + if path.resolve() not in self._known_files: + self._delete_file_if_confirmed(path) -# How to use: -# -# 1. Before downloading any files -# orga = Organizer("/home/user/sync/", "/home/user/sync/bookstore/") -# orga.clean_temp_dir() -# -# 2. Downloading a file -# tempfile = orga.temp_file() -# download_something_to(tempfile) -# orga.add_file(tempfile, "books/douglas_adams/hhgttg" -# -# 3. After downloading all files -# orga.clean_sync_dir() -# orga.clean_temp_dir() + # Delete dir if it was empty and untracked + dir_empty = len(list(start_dir.iterdir())) == 0 + if start_dir.resolve() not in self._known_files and dir_empty: + start_dir.rmdir() + + def _delete_file_if_confirmed(self, path: Path) -> None: + prompt = f"Do you want to delete {path}" + + if prompt_yes_no(prompt, False): + path.unlink()