mirror of
https://github.com/Garmelon/PFERD.git
synced 2023-12-21 10:23:01 +01:00
Remove old organizer
This commit is contained in:
parent
01e6972c96
commit
ed9245c14d
@ -7,7 +7,7 @@ from typing import Any, Dict, Optional
|
||||
import requests
|
||||
import requests.auth
|
||||
|
||||
from .new_organizer import Organizer
|
||||
from .organizer import Organizer
|
||||
from .tmp_dir import TmpDir
|
||||
from .utils import stream_to_path
|
||||
|
||||
|
@ -6,7 +6,7 @@ from typing import Any, Dict
|
||||
import bs4
|
||||
import requests
|
||||
|
||||
from ..new_organizer import Organizer
|
||||
from ..organizer import Organizer
|
||||
from ..tmp_dir import TmpDir
|
||||
from ..utils import soupify, stream_to_path
|
||||
from .authenticators import IliasAuthenticator
|
||||
|
@ -1,123 +0,0 @@
|
||||
"""A simple helper for managing downloaded files.
|
||||
|
||||
A organizer is bound to a single directory.
|
||||
"""
|
||||
|
||||
import filecmp
|
||||
import logging
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import List, Set
|
||||
|
||||
from .utils import PrettyLogger, prompt_yes_no, resolve_path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
pretty_logger = PrettyLogger(logger)
|
||||
|
||||
|
||||
class FileAcceptException(Exception):
|
||||
"""An exception while accepting a file."""
|
||||
|
||||
def __init__(self, message: str):
|
||||
"""Create a new exception."""
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class Organizer():
|
||||
"""A helper for managing downloaded files."""
|
||||
|
||||
def __init__(self, path: Path):
|
||||
"""Create a new organizer for a given path."""
|
||||
self._path = path
|
||||
self._known_files: Set[Path] = set()
|
||||
# Keep the root dir
|
||||
self.mark(path)
|
||||
|
||||
@property
|
||||
def path(self) -> Path:
|
||||
"""Return the path for this organizer."""
|
||||
return self._path
|
||||
|
||||
def resolve(self, target_file: Path) -> Path:
|
||||
"""Resolve a file relative to the path of this organizer.
|
||||
|
||||
Raises an exception if the file is outside the given directory.
|
||||
"""
|
||||
return resolve_path(self.path, target_file)
|
||||
|
||||
def accept_file(self, src: Path, dst: Path) -> None:
|
||||
"""Move a file to this organizer and mark it."""
|
||||
src_absolute = src.resolve()
|
||||
dst_absolute = self.resolve(dst)
|
||||
|
||||
if not src_absolute.exists():
|
||||
raise FileAcceptException("Source file does not exist")
|
||||
|
||||
if not src_absolute.is_file():
|
||||
raise FileAcceptException("Source is a directory")
|
||||
|
||||
logger.debug(f"Copying '{src_absolute}' to '{dst_absolute}")
|
||||
|
||||
# Destination file is directory
|
||||
if dst_absolute.exists() and dst_absolute.is_dir():
|
||||
if prompt_yes_no(f"Overwrite folder {dst_absolute} with file?", default=False):
|
||||
shutil.rmtree(dst_absolute)
|
||||
else:
|
||||
logger.warn(f"Could not add file {dst_absolute}")
|
||||
return
|
||||
|
||||
# Destination file exists
|
||||
if dst_absolute.exists() and dst_absolute.is_file():
|
||||
if filecmp.cmp(str(src_absolute), str(dst_absolute), shallow=False):
|
||||
pretty_logger.ignored_file(dst_absolute)
|
||||
|
||||
# Bail out, nothing more to do
|
||||
self.mark(dst)
|
||||
return
|
||||
else:
|
||||
pretty_logger.modified_file(dst_absolute)
|
||||
else:
|
||||
pretty_logger.new_file(dst_absolute)
|
||||
|
||||
# Create parent dir if needed
|
||||
dst_parent_dir: Path = dst_absolute.parent
|
||||
dst_parent_dir.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
# Move file
|
||||
shutil.move(str(src_absolute), str(dst_absolute))
|
||||
|
||||
self.mark(dst)
|
||||
|
||||
def mark(self, path: Path) -> None:
|
||||
"""Mark a file as used so it will not get cleaned up."""
|
||||
absolute_path = self.path.joinpath(path).resolve()
|
||||
self._known_files.add(absolute_path)
|
||||
logger.debug(f"Tracked {absolute_path}")
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Remove all untracked files in the organizer's dir."""
|
||||
logger.debug("Deleting all untracked files...")
|
||||
|
||||
self._cleanup(self.path)
|
||||
|
||||
def _cleanup(self, start_dir: Path) -> None:
|
||||
paths: List[Path] = list(start_dir.iterdir())
|
||||
|
||||
# Recursively clean paths
|
||||
for path in paths:
|
||||
if path.is_dir():
|
||||
self._cleanup(path)
|
||||
else:
|
||||
if path.resolve() not in self._known_files:
|
||||
self._delete_file_if_confirmed(path)
|
||||
|
||||
# Delete dir if it was empty and untracked
|
||||
dir_empty = len(list(start_dir.iterdir())) == 0
|
||||
if start_dir.resolve() not in self._known_files and dir_empty:
|
||||
start_dir.rmdir()
|
||||
|
||||
def _delete_file_if_confirmed(self, path: Path) -> None:
|
||||
prompt = f"Do you want to delete {path}"
|
||||
|
||||
if prompt_yes_no(prompt, False):
|
||||
path.unlink()
|
@ -1,151 +1,123 @@
|
||||
"""A simple helper for managing downloaded files.
|
||||
|
||||
A organizer is bound to a single directory.
|
||||
"""
|
||||
|
||||
import filecmp
|
||||
import logging
|
||||
import pathlib
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import List, Set
|
||||
|
||||
from . import utils
|
||||
from .utils import PrettyLogger, prompt_yes_no, resolve_path
|
||||
|
||||
__all__ = ["Organizer"]
|
||||
logger = logging.getLogger(__name__)
|
||||
pretty = utils.PrettyLogger(logger)
|
||||
pretty_logger = PrettyLogger(logger)
|
||||
|
||||
class Organizer:
|
||||
def __init__(self, base_dir, sync_dir):
|
||||
"""
|
||||
base_dir - the .tmp directory will be created here
|
||||
sync_dir - synced files will be moved here
|
||||
Both are expected to be concrete pathlib paths.
|
||||
|
||||
class FileAcceptException(Exception):
|
||||
"""An exception while accepting a file."""
|
||||
|
||||
def __init__(self, message: str):
|
||||
"""Create a new exception."""
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class Organizer():
|
||||
"""A helper for managing downloaded files."""
|
||||
|
||||
def __init__(self, path: Path):
|
||||
"""Create a new organizer for a given path."""
|
||||
self._path = path
|
||||
self._known_files: Set[Path] = set()
|
||||
# Keep the root dir
|
||||
self.mark(path)
|
||||
|
||||
@property
|
||||
def path(self) -> Path:
|
||||
"""Return the path for this organizer."""
|
||||
return self._path
|
||||
|
||||
def resolve(self, target_file: Path) -> Path:
|
||||
"""Resolve a file relative to the path of this organizer.
|
||||
|
||||
Raises an exception if the file is outside the given directory.
|
||||
"""
|
||||
return resolve_path(self.path, target_file)
|
||||
|
||||
self._base_dir = base_dir
|
||||
self._sync_dir = sync_dir
|
||||
def accept_file(self, src: Path, dst: Path) -> None:
|
||||
"""Move a file to this organizer and mark it."""
|
||||
src_absolute = src.resolve()
|
||||
dst_absolute = self.resolve(dst)
|
||||
|
||||
self._temp_dir = pathlib.Path(self._base_dir, ".tmp")
|
||||
self._temp_nr = 0
|
||||
if not src_absolute.exists():
|
||||
raise FileAcceptException("Source file does not exist")
|
||||
|
||||
# check if base/sync dir exist?
|
||||
if not src_absolute.is_file():
|
||||
raise FileAcceptException("Source is a directory")
|
||||
|
||||
self._added_files = set()
|
||||
logger.debug(f"Copying '{src_absolute}' to '{dst_absolute}")
|
||||
|
||||
def clean_temp_dir(self):
|
||||
if self._temp_dir.exists():
|
||||
shutil.rmtree(self._temp_dir)
|
||||
self._temp_dir.mkdir(exist_ok=True)
|
||||
logger.debug(f"Cleaned temp dir: {self._temp_dir}")
|
||||
|
||||
def temp_dir(self):
|
||||
nr = self._temp_nr
|
||||
self._temp_nr += 1
|
||||
temp_dir = pathlib.Path(self._temp_dir, f"{nr:08}").resolve()
|
||||
logger.debug(f"Produced new temp dir: {temp_dir}")
|
||||
return temp_dir
|
||||
|
||||
def temp_file(self):
|
||||
# generate the path to a new temp file in base_path/.tmp/
|
||||
# make sure no two paths are the same
|
||||
nr = self._temp_nr
|
||||
self._temp_nr += 1
|
||||
temp_file = pathlib.Path(self._temp_dir, f"{nr:08}.tmp").resolve()
|
||||
logger.debug(f"Produced new temp file: {temp_file}")
|
||||
return temp_file
|
||||
|
||||
def add_file(self, from_path, to_path):
|
||||
if not from_path.exists():
|
||||
raise utils.FileNotFoundException(f"Could not add file at {from_path}")
|
||||
|
||||
# check if sync_dir/to_path is inside sync_dir?
|
||||
to_path = pathlib.Path(self._sync_dir, to_path)
|
||||
|
||||
if to_path.exists() and to_path.is_dir():
|
||||
if self._prompt_yes_no(f"Overwrite folder {to_path} with file?", default=False):
|
||||
shutil.rmtree(to_path)
|
||||
# Destination file is directory
|
||||
if dst_absolute.exists() and dst_absolute.is_dir():
|
||||
if prompt_yes_no(f"Overwrite folder {dst_absolute} with file?", default=False):
|
||||
shutil.rmtree(dst_absolute)
|
||||
else:
|
||||
logger.warn(f"Could not add file {to_path}")
|
||||
logger.warn(f"Could not add file {dst_absolute}")
|
||||
return
|
||||
|
||||
if to_path.exists():
|
||||
if filecmp.cmp(from_path, to_path, shallow=False):
|
||||
pretty.ignored_file(to_path)
|
||||
# Destination file exists
|
||||
if dst_absolute.exists() and dst_absolute.is_file():
|
||||
if filecmp.cmp(str(src_absolute), str(dst_absolute), shallow=False):
|
||||
pretty_logger.ignored_file(dst_absolute)
|
||||
|
||||
# remember path for later reference
|
||||
self._added_files.add(to_path.resolve())
|
||||
logger.debug(f"Added file {to_path.resolve()}")
|
||||
|
||||
# No further action needed, especially not overwriting symlinks...
|
||||
# Bail out, nothing more to do
|
||||
self.mark(dst)
|
||||
return
|
||||
else:
|
||||
pretty.modified_file(to_path)
|
||||
pretty_logger.modified_file(dst_absolute)
|
||||
else:
|
||||
pretty.new_file(to_path)
|
||||
pretty_logger.new_file(dst_absolute)
|
||||
|
||||
# copy the file from from_path to sync_dir/to_path
|
||||
# If the file being replaced was a symlink, the link itself is overwritten,
|
||||
# not the file the link points to.
|
||||
to_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
from_path.replace(to_path)
|
||||
logger.debug(f"Moved {from_path} to {to_path}")
|
||||
# Create parent dir if needed
|
||||
dst_parent_dir: Path = dst_absolute.parent
|
||||
dst_parent_dir.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
# remember path for later reference, after the new file was written
|
||||
# This is necessary here because otherwise, resolve() would resolve the symlink too.
|
||||
self._added_files.add(to_path.resolve())
|
||||
logger.debug(f"Added file {to_path.resolve()}")
|
||||
# Move file
|
||||
shutil.move(str(src_absolute), str(dst_absolute))
|
||||
|
||||
def clean_sync_dir(self):
|
||||
self._clean_dir(self._sync_dir, remove_parent=False)
|
||||
logger.debug(f"Cleaned sync dir: {self._sync_dir}")
|
||||
self.mark(dst)
|
||||
|
||||
def _clean_dir(self, path, remove_parent=True):
|
||||
for child in sorted(path.iterdir()):
|
||||
logger.debug(f"Looking at {child.resolve()}")
|
||||
if child.is_dir():
|
||||
self._clean_dir(child, remove_parent=True)
|
||||
elif child.resolve() not in self._added_files:
|
||||
if self._prompt_yes_no(f"Delete {child}?", default=False):
|
||||
child.unlink()
|
||||
logger.debug(f"Deleted {child}")
|
||||
def mark(self, path: Path) -> None:
|
||||
"""Mark a file as used so it will not get cleaned up."""
|
||||
absolute_path = self.path.joinpath(path).resolve()
|
||||
self._known_files.add(absolute_path)
|
||||
logger.debug(f"Tracked {absolute_path}")
|
||||
|
||||
if remove_parent:
|
||||
try:
|
||||
path.rmdir()
|
||||
except OSError: # directory not empty
|
||||
pass
|
||||
def cleanup(self) -> None:
|
||||
"""Remove all untracked files in the organizer's dir."""
|
||||
logger.debug("Deleting all untracked files...")
|
||||
|
||||
def _prompt_yes_no(self, question, default=None):
|
||||
if default is True:
|
||||
prompt = "[Y/n]"
|
||||
elif default is False:
|
||||
prompt = "[y/N]"
|
||||
else:
|
||||
prompt = "[y/n]"
|
||||
self._cleanup(self.path)
|
||||
|
||||
text = f"{question} {prompt} "
|
||||
WRONG_REPLY = "Please reply with 'yes'/'y' or 'no'/'n'."
|
||||
def _cleanup(self, start_dir: Path) -> None:
|
||||
paths: List[Path] = list(start_dir.iterdir())
|
||||
|
||||
while True:
|
||||
response = input(text).strip().lower()
|
||||
if response in {"yes", "ye", "y"}:
|
||||
return True
|
||||
elif response in {"no", "n"}:
|
||||
return False
|
||||
elif response == "":
|
||||
if default is None:
|
||||
print(WRONG_REPLY)
|
||||
else:
|
||||
return default
|
||||
# Recursively clean paths
|
||||
for path in paths:
|
||||
if path.is_dir():
|
||||
self._cleanup(path)
|
||||
else:
|
||||
print(WRONG_REPLY)
|
||||
if path.resolve() not in self._known_files:
|
||||
self._delete_file_if_confirmed(path)
|
||||
|
||||
# How to use:
|
||||
#
|
||||
# 1. Before downloading any files
|
||||
# orga = Organizer("/home/user/sync/", "/home/user/sync/bookstore/")
|
||||
# orga.clean_temp_dir()
|
||||
#
|
||||
# 2. Downloading a file
|
||||
# tempfile = orga.temp_file()
|
||||
# download_something_to(tempfile)
|
||||
# orga.add_file(tempfile, "books/douglas_adams/hhgttg"
|
||||
#
|
||||
# 3. After downloading all files
|
||||
# orga.clean_sync_dir()
|
||||
# orga.clean_temp_dir()
|
||||
# Delete dir if it was empty and untracked
|
||||
dir_empty = len(list(start_dir.iterdir())) == 0
|
||||
if start_dir.resolve() not in self._known_files and dir_empty:
|
||||
start_dir.rmdir()
|
||||
|
||||
def _delete_file_if_confirmed(self, path: Path) -> None:
|
||||
prompt = f"Do you want to delete {path}"
|
||||
|
||||
if prompt_yes_no(prompt, False):
|
||||
path.unlink()
|
||||
|
Loading…
x
Reference in New Issue
Block a user