pferd/PFERD/organizer.py

225 lines
7.8 KiB
Python
Raw Permalink Normal View History

2020-04-20 18:50:23 +02:00
"""A simple helper for managing downloaded files.
A organizer is bound to a single directory.
"""
2018-11-23 11:08:31 +01:00
import filecmp
2018-11-23 09:56:59 +01:00
import logging
import os
2018-11-23 09:56:59 +01:00
import shutil
from enum import Enum
2020-04-23 19:38:41 +02:00
from pathlib import Path, PurePath
from typing import Callable, List, Optional, Set
2018-11-23 09:56:59 +01:00
from .download_summary import DownloadSummary
2020-04-23 19:38:28 +02:00
from .location import Location
2020-04-25 19:59:58 +02:00
from .logging import PrettyLogger
from .utils import prompt_yes_no
2020-04-20 19:27:26 +02:00
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
2018-11-23 09:56:59 +01:00
2019-04-24 14:34:20 +02:00
class ConflictType(Enum):
"""
The type of the conflict. A file might not exist anymore and will be deleted
or it might be overwritten with a newer version.
FILE_OVERWRITTEN: An existing file will be updated
MARKED_FILE_OVERWRITTEN: A file is written for the second+ time in this run
FILE_DELETED: The file was deleted
"""
FILE_OVERWRITTEN = "overwritten"
MARKED_FILE_OVERWRITTEN = "marked_file_overwritten"
FILE_DELETED = "deleted"
class FileConflictResolution(Enum):
"""
The reaction when confronted with a file conflict:
DESTROY_EXISTING: Delete/overwrite the current file
KEEP_EXISTING: Keep the current file
DEFAULT: Do whatever the PFERD authors thought is sensible
PROMPT: Interactively ask the user
"""
DESTROY_EXISTING = "destroy"
KEEP_EXISTING = "keep"
DEFAULT = "default"
PROMPT = "prompt"
FileConflictResolver = Callable[[PurePath, ConflictType], FileConflictResolution]
def resolve_prompt_user(_path: PurePath, conflict: ConflictType) -> FileConflictResolution:
"""
Resolves conflicts by asking the user if a file was written twice or will be deleted.
"""
if conflict == ConflictType.FILE_OVERWRITTEN:
return FileConflictResolution.DESTROY_EXISTING
return FileConflictResolution.PROMPT
2020-04-20 18:50:23 +02:00
class FileAcceptException(Exception):
"""An exception while accepting a file."""
2019-04-24 14:34:20 +02:00
2020-04-20 19:27:26 +02:00
class Organizer(Location):
2020-04-20 18:50:23 +02:00
"""A helper for managing downloaded files."""
2019-04-24 14:34:20 +02:00
def __init__(self, path: Path, conflict_resolver: FileConflictResolver = resolve_prompt_user):
2020-04-20 18:50:23 +02:00
"""Create a new organizer for a given path."""
2020-04-20 19:27:26 +02:00
super().__init__(path)
2020-04-20 18:50:23 +02:00
self._known_files: Set[Path] = set()
2020-04-20 19:27:26 +02:00
2020-04-20 18:50:23 +02:00
# Keep the root dir
self._known_files.add(path.resolve())
2019-04-24 14:34:20 +02:00
self.download_summary = DownloadSummary()
self.conflict_resolver = conflict_resolver
2020-12-02 16:58:36 +01:00
def accept_file(self, src: Path, dst: PurePath) -> Optional[Path]:
"""
Move a file to this organizer and mark it.
Returns the path the file was moved to, to allow the caller to adjust the metadata.
As you might still need to adjust the metadata when the file was identical
(e.g. update the timestamp), the path is also returned in this case.
In all other cases (ignored, not overwritten, etc.) this method returns None.
"""
# Windows limits the path length to 260 for *some* historical reason
# If you want longer paths, you will have to add the "\\?\" prefix in front of
# your path...
# See:
# https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation
if os.name == 'nt':
src_absolute = Path("\\\\?\\" + str(src.resolve()))
dst_absolute = Path("\\\\?\\" + str(self.resolve(dst)))
else:
src_absolute = src.resolve()
dst_absolute = self.resolve(dst)
2019-04-24 14:34:20 +02:00
2020-04-20 18:50:23 +02:00
if not src_absolute.exists():
raise FileAcceptException("Source file does not exist")
2019-04-24 14:34:20 +02:00
2020-04-20 18:50:23 +02:00
if not src_absolute.is_file():
raise FileAcceptException("Source is a directory")
2020-04-20 19:27:26 +02:00
LOGGER.debug("Copying %s to %s", src_absolute, dst_absolute)
2020-04-20 18:50:23 +02:00
2020-05-10 21:37:48 +02:00
if self._is_marked(dst):
2020-05-10 21:53:24 +02:00
PRETTY.warning(f"File {str(dst_absolute)!r} was already written!")
conflict = ConflictType.MARKED_FILE_OVERWRITTEN
2020-12-30 14:45:46 +01:00
if self._resolve_conflict("Overwrite file?", dst_absolute, conflict, default=False):
2020-05-10 21:37:48 +02:00
PRETTY.ignored_file(dst_absolute, "file was written previously")
return None
2020-05-10 21:37:48 +02:00
2020-04-20 18:50:23 +02:00
# Destination file is directory
if dst_absolute.exists() and dst_absolute.is_dir():
prompt = f"Overwrite folder {dst_absolute} with file?"
conflict = ConflictType.FILE_OVERWRITTEN
if self._resolve_conflict(prompt, dst_absolute, conflict, default=False):
2020-04-20 18:50:23 +02:00
shutil.rmtree(dst_absolute)
2019-04-24 14:34:20 +02:00
else:
2020-05-10 21:53:24 +02:00
PRETTY.warning(f"Could not add file {str(dst_absolute)!r}")
return None
2019-04-24 14:34:20 +02:00
2020-04-20 18:50:23 +02:00
# Destination file exists
if dst_absolute.exists() and dst_absolute.is_file():
if filecmp.cmp(str(src_absolute), str(dst_absolute), shallow=False):
# Bail out, nothing more to do
2020-04-24 20:24:44 +02:00
PRETTY.ignored_file(dst_absolute, "same file contents")
2020-04-20 18:50:23 +02:00
self.mark(dst)
return dst_absolute
2020-04-20 19:27:26 +02:00
prompt = f"Overwrite file {dst_absolute}?"
conflict = ConflictType.FILE_OVERWRITTEN
if not self._resolve_conflict(prompt, dst_absolute, conflict, default=True):
PRETTY.ignored_file(dst_absolute, "user conflict resolution")
return None
2020-06-26 15:37:35 +02:00
self.download_summary.add_modified_file(dst_absolute)
2020-04-20 19:27:26 +02:00
PRETTY.modified_file(dst_absolute)
2019-04-24 14:34:20 +02:00
else:
self.download_summary.add_new_file(dst_absolute)
2020-04-20 19:27:26 +02:00
PRETTY.new_file(dst_absolute)
2020-04-20 18:50:23 +02:00
# Create parent dir if needed
dst_parent_dir: Path = dst_absolute.parent
dst_parent_dir.mkdir(exist_ok=True, parents=True)
# Move file
shutil.move(str(src_absolute), str(dst_absolute))
self.mark(dst)
return dst_absolute
2020-04-23 19:38:41 +02:00
def mark(self, path: PurePath) -> None:
2020-04-20 18:50:23 +02:00
"""Mark a file as used so it will not get cleaned up."""
2020-04-23 19:38:41 +02:00
absolute_path = self.resolve(path)
2020-04-20 18:50:23 +02:00
self._known_files.add(absolute_path)
2020-04-20 19:27:26 +02:00
LOGGER.debug("Tracked %s", absolute_path)
2020-04-20 18:50:23 +02:00
2020-05-10 21:37:48 +02:00
def _is_marked(self, path: PurePath) -> bool:
"""
Checks whether a file is marked.
"""
absolute_path = self.resolve(path)
return absolute_path in self._known_files
2020-04-20 18:50:23 +02:00
def cleanup(self) -> None:
"""Remove all untracked files in the organizer's dir."""
2020-04-20 19:27:26 +02:00
LOGGER.debug("Deleting all untracked files...")
2020-04-20 18:50:23 +02:00
self._cleanup(self.path)
def _cleanup(self, start_dir: Path) -> None:
2020-11-03 20:09:54 +01:00
if not start_dir.exists():
return
2020-04-20 18:50:23 +02:00
paths: List[Path] = list(start_dir.iterdir())
# Recursively clean paths
for path in paths:
if path.is_dir():
self._cleanup(path)
2019-04-24 14:34:20 +02:00
else:
2020-04-20 18:50:23 +02:00
if path.resolve() not in self._known_files:
self._delete_file_if_confirmed(path)
# Delete dir if it was empty and untracked
dir_empty = len(list(start_dir.iterdir())) == 0
if start_dir.resolve() not in self._known_files and dir_empty:
start_dir.rmdir()
2020-06-25 21:30:03 +02:00
def _delete_file_if_confirmed(self, path: Path) -> None:
2020-04-20 18:50:23 +02:00
prompt = f"Do you want to delete {path}"
if self._resolve_conflict(prompt, path, ConflictType.FILE_DELETED, default=False):
self.download_summary.add_deleted_file(path)
2020-04-20 18:50:23 +02:00
path.unlink()
else:
PRETTY.ignored_file(path, "user conflict resolution")
def _resolve_conflict(
self, prompt: str, path: Path, conflict: ConflictType, default: bool
) -> bool:
if not self.conflict_resolver:
return prompt_yes_no(prompt, default=default)
result = self.conflict_resolver(path, conflict)
if result == FileConflictResolution.DEFAULT:
return default
if result == FileConflictResolution.KEEP_EXISTING:
return False
if result == FileConflictResolution.DESTROY_EXISTING:
return True
return prompt_yes_no(prompt, default=default)