diff --git a/.gitignore b/.gitignore index 9ee2832..c888722 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ -/.mypy_cache/ +.mypy_cache/ /.venv/ /PFERD.egg-info/ __pycache__/ diff --git a/PFERD/__init__.py b/PFERD/__init__.py index b657171..7b3a3c1 100644 --- a/PFERD/__init__.py +++ b/PFERD/__init__.py @@ -1,2 +1,40 @@ +import argparse +from pathlib import Path + +from .config import Config, ConfigDumpException, ConfigLoadException + + def main() -> None: - print("Hello world") + parser = argparse.ArgumentParser() + parser.add_argument( + "--config", "-c", + type=Path, + metavar="PATH", + help="specify custom config file path", + ) + parser.add_argument( + "--dump-config", + nargs="?", + const=True, + type=Path, + metavar="PATH", + help="dump current configuration to a file and exit." + " Uses default config file path if no path is specified", + ) + args = parser.parse_args() + + try: + config_parser = Config.load_parser(args.config) + config = Config(config_parser) + except ConfigLoadException: + exit(1) + + if args.dump_config: + path = None if args.dump_config is True else args.dump_config + try: + config.dump(path) + except ConfigDumpException: + exit(1) + exit() + + print(config) diff --git a/PFERD/config.py b/PFERD/config.py new file mode 100644 index 0000000..d71e4d1 --- /dev/null +++ b/PFERD/config.py @@ -0,0 +1,101 @@ +import configparser +import os +from pathlib import Path +from typing import Optional + +from .utils import prompt_yes_no + + +class ConfigLoadException(Exception): + pass + + +class ConfigDumpException(Exception): + pass + + +class Config: + @staticmethod + def _default_path() -> Path: + if os.name == "posix": + return Path("~/.config/PFERD/pferd.cfg").expanduser() + elif os.name == "nt": + return Path("~/AppData/Roaming/PFERD/pferd.cfg").expanduser() + else: + return Path("~/.pferd.cfg").expanduser() + + def __init__(self, parser: configparser.ConfigParser): + self._parser = parser + # TODO Load and validate config into dataclasses + + @staticmethod + def _fail_load(path: Path, reason: str) -> None: + print(f"Failed to load config file at {path}") + print(f"Reason: {reason}") + raise ConfigLoadException() + + @staticmethod + def load_parser(path: Optional[Path] = None) -> configparser.ConfigParser: + """ + May throw a ConfigLoadException. + """ + + if not path: + path = Config._default_path() + + parser = configparser.ConfigParser() + + # Using config.read_file instead of config.read because config.read + # would just ignore a missing file and carry on. + try: + with open(path) as f: + parser.read_file(f, source=str(path)) + except FileNotFoundError: + Config._fail_load(path, "File does not exist") + except IsADirectoryError: + Config._fail_load(path, "That's a directory, not a file") + except PermissionError: + Config._fail_load(path, "Insufficient permissions") + + return parser + + @staticmethod + def _fail_dump(path: Path, reason: str) -> None: + print(f"Failed to dump config file to {path}") + print(f"Reason: {reason}") + raise ConfigDumpException() + + def dump(self, path: Optional[Path] = None) -> None: + """ + May throw a ConfigDumpException. + """ + + if not path: + path = self._default_path() + + print(f"Dumping config to {path}") + + try: + path.parent.mkdir(parents=True, exist_ok=True) + except PermissionError: + self._fail_dump(path, "Could not create parent directory") + + try: + # Ensuring we don't accidentally overwrite any existing files by + # always asking before overwriting a file. + try: + # x = open for exclusive creation, failing if the file already + # exists + with open(path, "x") as f: + self._parser.write(f) + except FileExistsError: + print("That file already exists.") + if prompt_yes_no("Overwrite it?", default=False): + with open(path, "w") as f: + self._parser.write(f) + else: + self._fail_dump(path, "File already exists") + except IsADirectoryError: + self._fail_dump(path, "That's a directory, not a file") + except PermissionError: + self._fail_dump(path, "Insufficient permissions") diff --git a/PFERD/utils.py b/PFERD/utils.py index 56c101a..4e1b5d7 100644 --- a/PFERD/utils.py +++ b/PFERD/utils.py @@ -1,98 +1,25 @@ -""" -A few utility bobs and bits. -""" - -import re -from pathlib import Path, PurePath -from typing import Optional, Tuple, Union - -import bs4 -import requests - -from .progress import ProgressSettings, progress_for, size_from_headers - -PathLike = Union[PurePath, str, Tuple[str, ...]] +from typing import Optional -def to_path(pathlike: PathLike) -> Path: +def prompt_yes_no(query: str, default: Optional[bool]) -> bool: """ - Convert a given PathLike into a Path. - """ - if isinstance(pathlike, tuple): - return Path(*pathlike) - return Path(pathlike) - - -Regex = Union[str, re.Pattern] - - -def to_pattern(regex: Regex) -> re.Pattern: - """ - Convert a regex to a re.Pattern. - """ - if isinstance(regex, re.Pattern): - return regex - return re.compile(regex) - - -def soupify(response: requests.Response) -> bs4.BeautifulSoup: - """ - Wrap a requests response in a bs4 object. - """ - - return bs4.BeautifulSoup(response.text, "html.parser") - - -def stream_to_path( - response: requests.Response, - target: Path, - progress_name: Optional[str] = None, - chunk_size: int = 1024 ** 2 -) -> None: - """ - Download a requests response content to a file by streaming it. This - function avoids excessive memory usage when downloading large files. The - chunk_size is in bytes. - - If progress_name is None, no progress bar will be shown. Otherwise a progress - bar will appear, if the download is bigger than an internal threshold. - """ - - with response: - length = size_from_headers(response) - if progress_name and length and int(length) > 1024 * 1024 * 10: # 10 MiB - settings: Optional[ProgressSettings] = ProgressSettings(progress_name, length) - else: - settings = None - - with open(target, 'wb') as file_descriptor: - with progress_for(settings) as progress: - for chunk in response.iter_content(chunk_size=chunk_size): - file_descriptor.write(chunk) - progress.advance(len(chunk)) - - -def prompt_yes_no(question: str, default: Optional[bool] = None) -> bool: - """ - Prompts the user a yes/no question and returns their choice. + Asks the user a yes/no question and returns their choice. """ if default is True: - prompt = "[Y/n]" + query += " [Y/n] " elif default is False: - prompt = "[y/N]" + query += " [y/N] " else: - prompt = "[y/n]" - - text = f"{question} {prompt} " - wrong_reply = "Please reply with 'yes'/'y' or 'no'/'n'." + query += " [y/n] " while True: - response = input(text).strip().lower() - if response in {"yes", "ye", "y"}: + response = input(query).strip().lower() + if response == "y": return True - if response in {"no", "n"}: + elif response == "n": return False - if response == "" and default is not None: + elif response == "" and default is not None: return default - print(wrong_reply) + + print("Please answer with 'y' or 'n'.")