mirror of
https://github.com/Garmelon/PFERD.git
synced 2023-12-21 10:23:01 +01:00
Load and dump config
This commit is contained in:
parent
5595a908d8
commit
fbebc46c58
2
.gitignore
vendored
2
.gitignore
vendored
@ -1,4 +1,4 @@
|
||||
/.mypy_cache/
|
||||
.mypy_cache/
|
||||
/.venv/
|
||||
/PFERD.egg-info/
|
||||
__pycache__/
|
||||
|
@ -1,2 +1,40 @@
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
from .config import Config, ConfigDumpException, ConfigLoadException
|
||||
|
||||
|
||||
def main() -> None:
|
||||
print("Hello world")
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--config", "-c",
|
||||
type=Path,
|
||||
metavar="PATH",
|
||||
help="specify custom config file path",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dump-config",
|
||||
nargs="?",
|
||||
const=True,
|
||||
type=Path,
|
||||
metavar="PATH",
|
||||
help="dump current configuration to a file and exit."
|
||||
" Uses default config file path if no path is specified",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
config_parser = Config.load_parser(args.config)
|
||||
config = Config(config_parser)
|
||||
except ConfigLoadException:
|
||||
exit(1)
|
||||
|
||||
if args.dump_config:
|
||||
path = None if args.dump_config is True else args.dump_config
|
||||
try:
|
||||
config.dump(path)
|
||||
except ConfigDumpException:
|
||||
exit(1)
|
||||
exit()
|
||||
|
||||
print(config)
|
||||
|
101
PFERD/config.py
Normal file
101
PFERD/config.py
Normal file
@ -0,0 +1,101 @@
|
||||
import configparser
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from .utils import prompt_yes_no
|
||||
|
||||
|
||||
class ConfigLoadException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ConfigDumpException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Config:
|
||||
@staticmethod
|
||||
def _default_path() -> Path:
|
||||
if os.name == "posix":
|
||||
return Path("~/.config/PFERD/pferd.cfg").expanduser()
|
||||
elif os.name == "nt":
|
||||
return Path("~/AppData/Roaming/PFERD/pferd.cfg").expanduser()
|
||||
else:
|
||||
return Path("~/.pferd.cfg").expanduser()
|
||||
|
||||
def __init__(self, parser: configparser.ConfigParser):
|
||||
self._parser = parser
|
||||
# TODO Load and validate config into dataclasses
|
||||
|
||||
@staticmethod
|
||||
def _fail_load(path: Path, reason: str) -> None:
|
||||
print(f"Failed to load config file at {path}")
|
||||
print(f"Reason: {reason}")
|
||||
raise ConfigLoadException()
|
||||
|
||||
@staticmethod
|
||||
def load_parser(path: Optional[Path] = None) -> configparser.ConfigParser:
|
||||
"""
|
||||
May throw a ConfigLoadException.
|
||||
"""
|
||||
|
||||
if not path:
|
||||
path = Config._default_path()
|
||||
|
||||
parser = configparser.ConfigParser()
|
||||
|
||||
# Using config.read_file instead of config.read because config.read
|
||||
# would just ignore a missing file and carry on.
|
||||
try:
|
||||
with open(path) as f:
|
||||
parser.read_file(f, source=str(path))
|
||||
except FileNotFoundError:
|
||||
Config._fail_load(path, "File does not exist")
|
||||
except IsADirectoryError:
|
||||
Config._fail_load(path, "That's a directory, not a file")
|
||||
except PermissionError:
|
||||
Config._fail_load(path, "Insufficient permissions")
|
||||
|
||||
return parser
|
||||
|
||||
@staticmethod
|
||||
def _fail_dump(path: Path, reason: str) -> None:
|
||||
print(f"Failed to dump config file to {path}")
|
||||
print(f"Reason: {reason}")
|
||||
raise ConfigDumpException()
|
||||
|
||||
def dump(self, path: Optional[Path] = None) -> None:
|
||||
"""
|
||||
May throw a ConfigDumpException.
|
||||
"""
|
||||
|
||||
if not path:
|
||||
path = self._default_path()
|
||||
|
||||
print(f"Dumping config to {path}")
|
||||
|
||||
try:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
except PermissionError:
|
||||
self._fail_dump(path, "Could not create parent directory")
|
||||
|
||||
try:
|
||||
# Ensuring we don't accidentally overwrite any existing files by
|
||||
# always asking before overwriting a file.
|
||||
try:
|
||||
# x = open for exclusive creation, failing if the file already
|
||||
# exists
|
||||
with open(path, "x") as f:
|
||||
self._parser.write(f)
|
||||
except FileExistsError:
|
||||
print("That file already exists.")
|
||||
if prompt_yes_no("Overwrite it?", default=False):
|
||||
with open(path, "w") as f:
|
||||
self._parser.write(f)
|
||||
else:
|
||||
self._fail_dump(path, "File already exists")
|
||||
except IsADirectoryError:
|
||||
self._fail_dump(path, "That's a directory, not a file")
|
||||
except PermissionError:
|
||||
self._fail_dump(path, "Insufficient permissions")
|
@ -1,98 +1,25 @@
|
||||
"""
|
||||
A few utility bobs and bits.
|
||||
"""
|
||||
|
||||
import re
|
||||
from pathlib import Path, PurePath
|
||||
from typing import Optional, Tuple, Union
|
||||
|
||||
import bs4
|
||||
import requests
|
||||
|
||||
from .progress import ProgressSettings, progress_for, size_from_headers
|
||||
|
||||
PathLike = Union[PurePath, str, Tuple[str, ...]]
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def to_path(pathlike: PathLike) -> Path:
|
||||
def prompt_yes_no(query: str, default: Optional[bool]) -> bool:
|
||||
"""
|
||||
Convert a given PathLike into a Path.
|
||||
"""
|
||||
if isinstance(pathlike, tuple):
|
||||
return Path(*pathlike)
|
||||
return Path(pathlike)
|
||||
|
||||
|
||||
Regex = Union[str, re.Pattern]
|
||||
|
||||
|
||||
def to_pattern(regex: Regex) -> re.Pattern:
|
||||
"""
|
||||
Convert a regex to a re.Pattern.
|
||||
"""
|
||||
if isinstance(regex, re.Pattern):
|
||||
return regex
|
||||
return re.compile(regex)
|
||||
|
||||
|
||||
def soupify(response: requests.Response) -> bs4.BeautifulSoup:
|
||||
"""
|
||||
Wrap a requests response in a bs4 object.
|
||||
"""
|
||||
|
||||
return bs4.BeautifulSoup(response.text, "html.parser")
|
||||
|
||||
|
||||
def stream_to_path(
|
||||
response: requests.Response,
|
||||
target: Path,
|
||||
progress_name: Optional[str] = None,
|
||||
chunk_size: int = 1024 ** 2
|
||||
) -> None:
|
||||
"""
|
||||
Download a requests response content to a file by streaming it. This
|
||||
function avoids excessive memory usage when downloading large files. The
|
||||
chunk_size is in bytes.
|
||||
|
||||
If progress_name is None, no progress bar will be shown. Otherwise a progress
|
||||
bar will appear, if the download is bigger than an internal threshold.
|
||||
"""
|
||||
|
||||
with response:
|
||||
length = size_from_headers(response)
|
||||
if progress_name and length and int(length) > 1024 * 1024 * 10: # 10 MiB
|
||||
settings: Optional[ProgressSettings] = ProgressSettings(progress_name, length)
|
||||
else:
|
||||
settings = None
|
||||
|
||||
with open(target, 'wb') as file_descriptor:
|
||||
with progress_for(settings) as progress:
|
||||
for chunk in response.iter_content(chunk_size=chunk_size):
|
||||
file_descriptor.write(chunk)
|
||||
progress.advance(len(chunk))
|
||||
|
||||
|
||||
def prompt_yes_no(question: str, default: Optional[bool] = None) -> bool:
|
||||
"""
|
||||
Prompts the user a yes/no question and returns their choice.
|
||||
Asks the user a yes/no question and returns their choice.
|
||||
"""
|
||||
|
||||
if default is True:
|
||||
prompt = "[Y/n]"
|
||||
query += " [Y/n] "
|
||||
elif default is False:
|
||||
prompt = "[y/N]"
|
||||
query += " [y/N] "
|
||||
else:
|
||||
prompt = "[y/n]"
|
||||
|
||||
text = f"{question} {prompt} "
|
||||
wrong_reply = "Please reply with 'yes'/'y' or 'no'/'n'."
|
||||
query += " [y/n] "
|
||||
|
||||
while True:
|
||||
response = input(text).strip().lower()
|
||||
if response in {"yes", "ye", "y"}:
|
||||
response = input(query).strip().lower()
|
||||
if response == "y":
|
||||
return True
|
||||
if response in {"no", "n"}:
|
||||
elif response == "n":
|
||||
return False
|
||||
if response == "" and default is not None:
|
||||
elif response == "" and default is not None:
|
||||
return default
|
||||
print(wrong_reply)
|
||||
|
||||
print("Please answer with 'y' or 'n'.")
|
||||
|
Loading…
Reference in New Issue
Block a user