mirror of
				https://github.com/Garmelon/PFERD.git
				synced 2025-10-25 19:12:30 +02:00 
			
		
		
		
	Add file organizer
This commit is contained in:
		
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -1,4 +1,5 @@ | |||||||
| **/__pycache__/ | **/__pycache__/ | ||||||
|  | .tmp/ | ||||||
| bin/ | bin/ | ||||||
| include/ | include/ | ||||||
| lib/ | lib/ | ||||||
|   | |||||||
| @@ -1,7 +1,9 @@ | |||||||
| from .authenticator import * | from .authenticator import * | ||||||
|  | from .organizer import * | ||||||
|  |  | ||||||
| __all__ = ( | __all__ = ( | ||||||
| 	authenticator.__all__ | 	authenticator.__all__ + | ||||||
|  | 	organizer.__all__ | ||||||
| ) | ) | ||||||
|  |  | ||||||
| LOG_FORMAT = "[%(levelname)s] %(message)s" | LOG_FORMAT = "[%(levelname)s] %(message)s" | ||||||
|   | |||||||
							
								
								
									
										109
									
								
								PFERD/organizer.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										109
									
								
								PFERD/organizer.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,109 @@ | |||||||
|  | import logging | ||||||
|  | import pathlib | ||||||
|  | import shutil | ||||||
|  |  | ||||||
|  | __all__ = [ | ||||||
|  | 	"Organizer" | ||||||
|  | ] | ||||||
|  | logger = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  | class Organizer: | ||||||
|  | 	def __init__(self, base_dir, sync_dir): | ||||||
|  | 		""" | ||||||
|  | 		base_dir - the .tmp directory will be created here | ||||||
|  | 		sync_dir - synced files will be moved here | ||||||
|  | 		Both are expected to be concrete pathlib paths. | ||||||
|  | 		""" | ||||||
|  |  | ||||||
|  | 		self._base_dir = base_dir | ||||||
|  | 		self._sync_dir = sync_dir | ||||||
|  |  | ||||||
|  | 		self._temp_dir = pathlib.Path(self._base_dir, ".tmp") | ||||||
|  | 		self._temp_nr = 0 | ||||||
|  |  | ||||||
|  | 		# check if base/sync dir exist? | ||||||
|  |  | ||||||
|  | 		self._added_files = set() | ||||||
|  |  | ||||||
|  | 	def clean_temp_dir(self): | ||||||
|  | 		if self._temp_dir.exists(): | ||||||
|  | 			shutil.rmtree(self._temp_dir) | ||||||
|  | 		self._temp_dir.mkdir(exist_ok=True) | ||||||
|  |  | ||||||
|  | 	def temp_file(self): | ||||||
|  | 		# generate the path to a new temp file in base_path/.tmp/ | ||||||
|  | 		# make sure no two paths are the same | ||||||
|  | 		nr = self._temp_nr | ||||||
|  | 		self._temp_nr += 1 | ||||||
|  | 		return pathlib.Path(self._temp_dir, f"{nr:08}.tmp").resolve() | ||||||
|  |  | ||||||
|  | 	def add_file(self, from_path, to_path): | ||||||
|  | 		# check if sync_dir/to_path is inside sync_dir? | ||||||
|  | 		to_path = pathlib.Path(self._sync_dir, to_path) | ||||||
|  | 		if to_path.exists(): | ||||||
|  | 			logger.info(f"Overwriting {to_path}.") | ||||||
|  |  | ||||||
|  | 		# copy the file from from_path to sync_dir/to_path | ||||||
|  | 		to_path.parent.mkdir(parents=True, exist_ok=True) | ||||||
|  | 		from_path.replace(to_path) | ||||||
|  |  | ||||||
|  | 		# remember path for later reference | ||||||
|  | 		self._added_files.add(to_path.resolve()) | ||||||
|  |  | ||||||
|  | 	def clean_sync_dir(self): | ||||||
|  | 		self._clean_dir(self._sync_dir, remove_parent=False) | ||||||
|  |  | ||||||
|  | 	def _clean_dir(self, path, remove_parent=True): | ||||||
|  | 		for child in path.iterdir(): | ||||||
|  | 			if child.is_dir(): | ||||||
|  | 				self._clean_dir(child, remove_parent=True) | ||||||
|  | 			elif child.resolve() not in self._added_files: | ||||||
|  | 				if self._prompt_yes_no(f"Delete {child}?", default=False): | ||||||
|  | 					logger.debug(f"Deleting {child.resolve()}") | ||||||
|  | 					child.unlink() | ||||||
|  |  | ||||||
|  | 		if remove_parent: | ||||||
|  | 			try: | ||||||
|  | 				path.rmdir() | ||||||
|  | 			except OSError: # directory not empty | ||||||
|  | 				pass | ||||||
|  |  | ||||||
|  | 	def _prompt_yes_no(self, question, default=None): | ||||||
|  | 		if default is True: | ||||||
|  | 			prompt = "[Y/n]" | ||||||
|  | 		elif default is False: | ||||||
|  | 			prompt = "[y/N]" | ||||||
|  | 		else: | ||||||
|  | 			prompt = "[y/n]" | ||||||
|  |  | ||||||
|  | 		text = f"{question} {prompt} " | ||||||
|  | 		wrong_reply = "Please reply with 'yes'/'y' or 'no'/'n'." | ||||||
|  |  | ||||||
|  | 		while True: | ||||||
|  | 			response = input(text).strip().lower() | ||||||
|  | 			if response in {"yes", "ye", "y"}: | ||||||
|  | 				return True | ||||||
|  | 			elif response in {"no", "n"}: | ||||||
|  | 				return False | ||||||
|  | 			elif response == "": | ||||||
|  | 				if default is None: | ||||||
|  | 					print(wrong_reply) | ||||||
|  | 				else: | ||||||
|  | 					return default | ||||||
|  | 			else: | ||||||
|  | 				print(wrong_reply) | ||||||
|  |  | ||||||
|  | # How to use: | ||||||
|  | # | ||||||
|  | # 1. Before downloading any files | ||||||
|  | # orga = Organizer("/home/user/sync/", "/home/user/sync/bookstore/") | ||||||
|  | # orga.clean_temp_dir() | ||||||
|  | # | ||||||
|  | # 2. Downloading a file | ||||||
|  | # tempfile = orga.temp_file() | ||||||
|  | # download_something_to(tempfile) | ||||||
|  | # orga.add_file(tempfile, "books/douglas_adams/hhgttg" | ||||||
|  | # | ||||||
|  | # 3. After downloading all files | ||||||
|  | # orga.clean_sync_dir() | ||||||
|  | # orga.clean_temp_dir() | ||||||
							
								
								
									
										21
									
								
								test.py
									
									
									
									
									
								
							
							
						
						
									
										21
									
								
								test.py
									
									
									
									
									
								
							| @@ -1,6 +1,7 @@ | |||||||
| import PFERD | import PFERD | ||||||
| import asyncio | import asyncio | ||||||
| import logging | import logging | ||||||
|  | import pathlib | ||||||
| import os | import os | ||||||
| import sys | import sys | ||||||
|  |  | ||||||
| @@ -8,13 +9,23 @@ logging.basicConfig(level=logging.DEBUG, format=PFERD.LOG_FORMAT) | |||||||
| #logging.basicConfig(level=logging.INFO, format=PFERD.LOG_FORMAT) | #logging.basicConfig(level=logging.INFO, format=PFERD.LOG_FORMAT) | ||||||
|  |  | ||||||
| async def test_download(): | async def test_download(): | ||||||
|  | 	base_path = pathlib.Path(".") | ||||||
|  | 	sync_path = pathlib.Path(base_path, "synctest") | ||||||
|  |  | ||||||
|  | 	orga = PFERD.Organizer(base_path, sync_path) | ||||||
| 	auth = PFERD.ShibbolethAuthenticator(cookie_path="cookie_jar") | 	auth = PFERD.ShibbolethAuthenticator(cookie_path="cookie_jar") | ||||||
| 	soup = await auth.get_webpage("885157") | 	#soup = await auth.get_webpage("885157") | ||||||
|  |  | ||||||
|  | 	orga.clean_temp_dir() | ||||||
|  |  | ||||||
|  | 	filename = orga.temp_file() | ||||||
|  | 	print(filename) | ||||||
|  | 	await auth.download_file("file_886544_download", filename) | ||||||
|  | 	orga.add_file(filename, pathlib.Path("test.pdf")) | ||||||
|  | 	 | ||||||
|  | 	orga.clean_sync_dir() | ||||||
|  | 	orga.clean_temp_dir() | ||||||
| 	await auth.close() | 	await auth.close() | ||||||
| 	if soup: |  | ||||||
| 		print("Soup acquired!") |  | ||||||
| 	else: |  | ||||||
| 		print("No soup acquired :(") |  | ||||||
|  |  | ||||||
| def main(): | def main(): | ||||||
| 	#print(f"  os.getcwd(): {os.getcwd()}") | 	#print(f"  os.getcwd(): {os.getcwd()}") | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Joscha
					Joscha