pferd/PFERD/organizer.py

110 lines
3.4 KiB
Python
Raw Normal View History

2020-04-20 18:50:23 +02:00
"""A simple helper for managing downloaded files.
A organizer is bound to a single directory.
"""
2018-11-23 11:08:31 +01:00
import filecmp
2018-11-23 09:56:59 +01:00
import logging
import shutil
2020-04-23 19:38:41 +02:00
from pathlib import Path, PurePath
2020-04-20 18:50:23 +02:00
from typing import List, Set
2018-11-23 09:56:59 +01:00
2020-04-23 19:38:28 +02:00
from .location import Location
from .utils import PrettyLogger, prompt_yes_no
2020-04-20 19:27:26 +02:00
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
2018-11-23 09:56:59 +01:00
2019-04-24 14:34:20 +02:00
2020-04-20 18:50:23 +02:00
class FileAcceptException(Exception):
"""An exception while accepting a file."""
2019-04-24 14:34:20 +02:00
2020-04-20 19:27:26 +02:00
class Organizer(Location):
2020-04-20 18:50:23 +02:00
"""A helper for managing downloaded files."""
2019-04-24 14:34:20 +02:00
2020-04-20 18:50:23 +02:00
def __init__(self, path: Path):
"""Create a new organizer for a given path."""
2020-04-20 19:27:26 +02:00
super().__init__(path)
2020-04-20 18:50:23 +02:00
self._known_files: Set[Path] = set()
2020-04-20 19:27:26 +02:00
2020-04-20 18:50:23 +02:00
# Keep the root dir
self.mark(path)
2019-04-24 14:34:20 +02:00
2020-04-23 19:38:41 +02:00
def accept_file(self, src: Path, dst: PurePath) -> None:
2020-04-20 18:50:23 +02:00
"""Move a file to this organizer and mark it."""
src_absolute = src.resolve()
dst_absolute = self.resolve(dst)
2019-04-24 14:34:20 +02:00
2020-04-20 18:50:23 +02:00
if not src_absolute.exists():
raise FileAcceptException("Source file does not exist")
2019-04-24 14:34:20 +02:00
2020-04-20 18:50:23 +02:00
if not src_absolute.is_file():
raise FileAcceptException("Source is a directory")
2020-04-20 19:27:26 +02:00
LOGGER.debug("Copying %s to %s", src_absolute, dst_absolute)
2020-04-20 18:50:23 +02:00
# Destination file is directory
if dst_absolute.exists() and dst_absolute.is_dir():
if prompt_yes_no(f"Overwrite folder {dst_absolute} with file?", default=False):
shutil.rmtree(dst_absolute)
2019-04-24 14:34:20 +02:00
else:
2020-04-20 19:27:26 +02:00
LOGGER.warning("Could not add file %s", dst_absolute)
2019-04-24 14:34:20 +02:00
return
2020-04-20 18:50:23 +02:00
# Destination file exists
if dst_absolute.exists() and dst_absolute.is_file():
if filecmp.cmp(str(src_absolute), str(dst_absolute), shallow=False):
# Bail out, nothing more to do
2020-04-20 19:27:26 +02:00
PRETTY.ignored_file(dst_absolute)
2020-04-20 18:50:23 +02:00
self.mark(dst)
2019-04-24 14:34:20 +02:00
return
2020-04-20 19:27:26 +02:00
PRETTY.modified_file(dst_absolute)
2019-04-24 14:34:20 +02:00
else:
2020-04-20 19:27:26 +02:00
PRETTY.new_file(dst_absolute)
2020-04-20 18:50:23 +02:00
# Create parent dir if needed
dst_parent_dir: Path = dst_absolute.parent
dst_parent_dir.mkdir(exist_ok=True, parents=True)
# Move file
shutil.move(str(src_absolute), str(dst_absolute))
self.mark(dst)
2020-04-23 19:38:41 +02:00
def mark(self, path: PurePath) -> None:
2020-04-20 18:50:23 +02:00
"""Mark a file as used so it will not get cleaned up."""
2020-04-23 19:38:41 +02:00
absolute_path = self.resolve(path)
2020-04-20 18:50:23 +02:00
self._known_files.add(absolute_path)
2020-04-20 19:27:26 +02:00
LOGGER.debug("Tracked %s", absolute_path)
2020-04-20 18:50:23 +02:00
def cleanup(self) -> None:
"""Remove all untracked files in the organizer's dir."""
2020-04-20 19:27:26 +02:00
LOGGER.debug("Deleting all untracked files...")
2020-04-20 18:50:23 +02:00
self._cleanup(self.path)
def _cleanup(self, start_dir: Path) -> None:
paths: List[Path] = list(start_dir.iterdir())
# Recursively clean paths
for path in paths:
if path.is_dir():
self._cleanup(path)
2019-04-24 14:34:20 +02:00
else:
2020-04-20 18:50:23 +02:00
if path.resolve() not in self._known_files:
self._delete_file_if_confirmed(path)
# Delete dir if it was empty and untracked
dir_empty = len(list(start_dir.iterdir())) == 0
if start_dir.resolve() not in self._known_files and dir_empty:
start_dir.rmdir()
2020-04-20 19:27:26 +02:00
@staticmethod
def _delete_file_if_confirmed(path: Path) -> None:
2020-04-20 18:50:23 +02:00
prompt = f"Do you want to delete {path}"
if prompt_yes_no(prompt, False):
path.unlink()