mirror of
https://github.com/Garmelon/PFERD.git
synced 2023-12-21 10:23:01 +01:00
Elaborate accept_file in new_organizer
This commit is contained in:
parent
5990098ef8
commit
6584d6a905
@ -3,14 +3,24 @@
|
|||||||
A organizer is bound to a single directory.
|
A organizer is bound to a single directory.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import filecmp
|
||||||
import logging
|
import logging
|
||||||
import shutil
|
import shutil
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List, Set
|
from typing import List, Set
|
||||||
|
|
||||||
from .utils import prompt_yes_no
|
from .utils import PrettyLogger, prompt_yes_no, resolve_path
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
pretty_logger = PrettyLogger(logger)
|
||||||
|
|
||||||
|
|
||||||
|
class FileAcceptException(Exception):
|
||||||
|
"""An exception while accepting a file."""
|
||||||
|
|
||||||
|
def __init__(self, message: str):
|
||||||
|
"""Create a new exception."""
|
||||||
|
super().__init__(message)
|
||||||
|
|
||||||
|
|
||||||
class Organizer():
|
class Organizer():
|
||||||
@ -20,35 +30,70 @@ class Organizer():
|
|||||||
"""Create a new organizer for a given path."""
|
"""Create a new organizer for a given path."""
|
||||||
self._path = path
|
self._path = path
|
||||||
self._known_files: Set[Path] = set()
|
self._known_files: Set[Path] = set()
|
||||||
|
# Keep the root dir
|
||||||
|
self.mark(path)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def path(self) -> Path:
|
def path(self) -> Path:
|
||||||
"""Return the path for this organizer."""
|
"""Return the path for this organizer."""
|
||||||
return self._path
|
return self._path
|
||||||
|
|
||||||
# TODO: Name this method :/ move_from? add_file? new_file?
|
def resolve(self, target_file: Path) -> Path:
|
||||||
def accept_file(self, source: Path, target: Path) -> None:
|
"""Resolve a file relative to the path of this organizer.
|
||||||
|
|
||||||
|
Raises an exception if the file is outside the given directory.
|
||||||
|
"""
|
||||||
|
return resolve_path(self.path, target_file)
|
||||||
|
|
||||||
|
def accept_file(self, src: Path, dst: Path) -> None:
|
||||||
"""Move a file to this organizer and mark it."""
|
"""Move a file to this organizer and mark it."""
|
||||||
source_absolute = self.path.joinpath(source).absolute()
|
src_absolute = src.resolve()
|
||||||
target_absolute = self.path.joinpath(target).absolute()
|
dst_absolute = self.resolve(dst)
|
||||||
|
|
||||||
logger.debug(f"Copying '{source_absolute}' to '{target_absolute}")
|
if not src_absolute.exists():
|
||||||
|
raise FileAcceptException("Source file does not exist")
|
||||||
|
|
||||||
shutil.move(str(source_absolute), str(target_absolute))
|
if not src_absolute.is_file():
|
||||||
|
raise FileAcceptException("Source is a directory")
|
||||||
|
|
||||||
self.mark_file(target)
|
logger.debug(f"Copying '{src_absolute}' to '{dst_absolute}")
|
||||||
|
|
||||||
# TODO: Name this method :/ track_file?
|
# Destination file is directory
|
||||||
def mark_file(self, path: Path) -> None:
|
if dst_absolute.exists() and dst_absolute.is_dir():
|
||||||
|
if prompt_yes_no(f"Overwrite folder {dst_absolute} with file?", default=False):
|
||||||
|
shutil.rmtree(dst_absolute)
|
||||||
|
else:
|
||||||
|
logger.warn(f"Could not add file {dst_absolute}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Destination file exists
|
||||||
|
if dst_absolute.exists() and dst_absolute.is_file():
|
||||||
|
if filecmp.cmp(str(src_absolute), str(dst_absolute), shallow=False):
|
||||||
|
pretty_logger.ignored_file(dst_absolute)
|
||||||
|
|
||||||
|
# Bail out, nothing more to do
|
||||||
|
self.mark(dst)
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
pretty_logger.modified_file(dst_absolute)
|
||||||
|
else:
|
||||||
|
pretty_logger.new_file(dst_absolute)
|
||||||
|
|
||||||
|
# Create parent dir if needed
|
||||||
|
dst_parent_dir: Path = dst_absolute.parent
|
||||||
|
dst_parent_dir.mkdir(exist_ok=True, parents=True)
|
||||||
|
|
||||||
|
# Move file
|
||||||
|
shutil.move(str(src_absolute), str(dst_absolute))
|
||||||
|
|
||||||
|
self.mark(dst)
|
||||||
|
|
||||||
|
def mark(self, path: Path) -> None:
|
||||||
"""Mark a file as used so it will not get cleaned up."""
|
"""Mark a file as used so it will not get cleaned up."""
|
||||||
absolute_path = self.path.joinpath(path).absolute()
|
absolute_path = self.path.joinpath(path).resolve()
|
||||||
self._known_files.add(absolute_path)
|
self._known_files.add(absolute_path)
|
||||||
logger.debug(f"Tracked {absolute_path}")
|
logger.debug(f"Tracked {absolute_path}")
|
||||||
|
|
||||||
def resolve_file(self, file_path: Path) -> Path:
|
|
||||||
"""Resolve a file relative to the path of this organizer."""
|
|
||||||
return self.path.joinpath(file_path)
|
|
||||||
|
|
||||||
def cleanup(self) -> None:
|
def cleanup(self) -> None:
|
||||||
"""Remove all untracked files in the organizer's dir."""
|
"""Remove all untracked files in the organizer's dir."""
|
||||||
logger.debug("Deleting all untracked files...")
|
logger.debug("Deleting all untracked files...")
|
||||||
@ -63,12 +108,12 @@ class Organizer():
|
|||||||
if path.is_dir():
|
if path.is_dir():
|
||||||
self._cleanup(path)
|
self._cleanup(path)
|
||||||
else:
|
else:
|
||||||
if path.absolute() not in self._known_files:
|
if path.resolve() not in self._known_files:
|
||||||
self._delete_file_if_confirmed(path)
|
self._delete_file_if_confirmed(path)
|
||||||
|
|
||||||
# Delete dir if it was empty and untracked
|
# Delete dir if it was empty and untracked
|
||||||
dir_empty = len(list(start_dir.iterdir())) == 0
|
dir_empty = len(list(start_dir.iterdir())) == 0
|
||||||
if start_dir.absolute() not in self._known_files and dir_empty:
|
if start_dir.resolve() not in self._known_files and dir_empty:
|
||||||
start_dir.rmdir()
|
start_dir.rmdir()
|
||||||
|
|
||||||
def _delete_file_if_confirmed(self, path: Path) -> None:
|
def _delete_file_if_confirmed(self, path: Path) -> None:
|
||||||
|
@ -52,6 +52,29 @@ def prompt_yes_no(question: str, default: Optional[bool] = None) -> bool:
|
|||||||
print(WRONG_REPLY)
|
print(WRONG_REPLY)
|
||||||
|
|
||||||
|
|
||||||
|
class ResolveException(Exception):
|
||||||
|
"""An exception while resolving a file."""
|
||||||
|
|
||||||
|
def __init__(self, message: str):
|
||||||
|
"""Create a new exception."""
|
||||||
|
super().__init__(message)
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_path(directory: Path, target_file: Path) -> Path:
|
||||||
|
"""Resolve a file relative to the path of this organizer.
|
||||||
|
|
||||||
|
Raises a [ResolveException] if the file is outside the given directory.
|
||||||
|
"""
|
||||||
|
absolute_path = directory.joinpath(target_file).resolve()
|
||||||
|
|
||||||
|
if not str(absolute_path).startswith(str(directory.resolve())):
|
||||||
|
raise ResolveException(
|
||||||
|
f"Path resolved to file outside folder ({absolute_path})"
|
||||||
|
)
|
||||||
|
|
||||||
|
return absolute_path
|
||||||
|
|
||||||
|
|
||||||
class PrettyLogger:
|
class PrettyLogger:
|
||||||
|
|
||||||
def __init__(self, logger: logging.Logger) -> None:
|
def __init__(self, logger: logging.Logger) -> None:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user