pferd/PFERD/config.py

167 lines
4.9 KiB
Python
Raw Normal View History

2021-05-15 21:33:51 +02:00
import asyncio
2021-04-27 12:41:49 +02:00
import os
2021-05-15 21:33:51 +02:00
import sys
2021-05-05 23:45:10 +02:00
from configparser import ConfigParser, SectionProxy
from dataclasses import dataclass
2021-04-27 12:41:49 +02:00
from pathlib import Path
2021-05-05 23:45:10 +02:00
from typing import Any, List, NoReturn, Optional, Tuple
2021-04-27 12:41:49 +02:00
2021-05-19 18:10:17 +02:00
from .logging import log
2021-04-27 12:41:49 +02:00
from .utils import prompt_yes_no
2021-05-19 18:10:17 +02:00
@dataclass
2021-04-27 12:41:49 +02:00
class ConfigLoadException(Exception):
2021-05-19 18:10:17 +02:00
path: Path
reason: str
2021-04-27 12:41:49 +02:00
class ConfigDumpException(Exception):
pass
2021-05-05 23:45:10 +02:00
@dataclass
class ConfigFormatException(Exception):
section: str
key: str
desc: str
class Section:
"""
Base class for the crawler and auth section classes.
"""
2021-05-05 23:45:10 +02:00
def __init__(self, section: SectionProxy):
self.s = section
def error(self, key: str, desc: str) -> NoReturn:
raise ConfigFormatException(self.s.name, key, desc)
def invalid_value(
self,
key: str,
value: Any,
reason: Optional[str],
) -> NoReturn:
if reason is None:
self.error(key, f"Invalid value {value!r}")
else:
self.error(key, f"Invalid value {value!r}: {reason}")
2021-05-05 23:45:10 +02:00
def missing_value(self, key: str) -> NoReturn:
self.error(key, "Missing value")
class DefaultSection(Section):
def working_dir(self) -> Path:
pathstr = self.s.get("working_dir", ".")
return Path(pathstr).expanduser()
def explain(self) -> bool:
return self.s.getboolean("explain", fallback=False)
2021-04-27 12:41:49 +02:00
class Config:
@staticmethod
def _default_path() -> Path:
if os.name == "posix":
return Path("~/.config/PFERD/pferd.cfg").expanduser()
elif os.name == "nt":
return Path("~/AppData/Roaming/PFERD/pferd.cfg").expanduser()
else:
return Path("~/.pferd.cfg").expanduser()
2021-05-05 23:45:10 +02:00
def __init__(self, parser: ConfigParser):
2021-04-27 12:41:49 +02:00
self._parser = parser
self._default_section = DefaultSection(parser[parser.default_section])
@property
def default_section(self) -> DefaultSection:
return self._default_section
2021-04-27 12:41:49 +02:00
@staticmethod
2021-05-15 21:33:51 +02:00
def load_parser(parser: ConfigParser, path: Optional[Path] = None) -> None:
2021-04-27 12:41:49 +02:00
"""
May throw a ConfigLoadException.
"""
2021-05-19 18:10:17 +02:00
if path:
log.explain("Using custom path")
else:
log.explain("Using default path")
2021-04-27 12:41:49 +02:00
path = Config._default_path()
2021-05-19 18:10:17 +02:00
log.explain(f"Loading {str(path)!r}")
2021-04-27 12:41:49 +02:00
# Using config.read_file instead of config.read because config.read
# would just ignore a missing file and carry on.
try:
with open(path) as f:
parser.read_file(f, source=str(path))
except FileNotFoundError:
2021-05-19 18:10:17 +02:00
raise ConfigLoadException(path, "File does not exist")
2021-04-27 12:41:49 +02:00
except IsADirectoryError:
2021-05-19 18:10:17 +02:00
raise ConfigLoadException(path, "That's a directory, not a file")
2021-04-27 12:41:49 +02:00
except PermissionError:
2021-05-19 18:10:17 +02:00
raise ConfigLoadException(path, "Insufficient permissions")
2021-04-27 12:41:49 +02:00
@staticmethod
def _fail_dump(path: Path, reason: str) -> None:
print(f"Failed to dump config file to {path}")
print(f"Reason: {reason}")
raise ConfigDumpException()
def dump(self, path: Optional[Path] = None) -> None:
"""
May throw a ConfigDumpException.
"""
if not path:
path = self._default_path()
print(f"Dumping config to {path}")
try:
path.parent.mkdir(parents=True, exist_ok=True)
except PermissionError:
self._fail_dump(path, "Could not create parent directory")
try:
# Ensuring we don't accidentally overwrite any existing files by
# always asking before overwriting a file.
try:
# x = open for exclusive creation, failing if the file already
# exists
with open(path, "x") as f:
self._parser.write(f)
except FileExistsError:
print("That file already exists.")
2021-05-15 21:33:51 +02:00
if asyncio.run(prompt_yes_no("Overwrite it?", default=False)):
2021-04-27 12:41:49 +02:00
with open(path, "w") as f:
self._parser.write(f)
else:
self._fail_dump(path, "File already exists")
except IsADirectoryError:
self._fail_dump(path, "That's a directory, not a file")
except PermissionError:
self._fail_dump(path, "Insufficient permissions")
2021-04-30 16:22:14 +02:00
2021-05-15 21:33:51 +02:00
def dump_to_stdout(self) -> None:
self._parser.write(sys.stdout)
2021-05-05 23:45:10 +02:00
def crawler_sections(self) -> List[Tuple[str, SectionProxy]]:
2021-04-30 16:22:14 +02:00
result = []
for name, proxy in self._parser.items():
if name.startswith("crawl:"):
result.append((name, proxy))
2021-04-30 16:22:14 +02:00
return result
def authenticator_sections(self) -> List[Tuple[str, SectionProxy]]:
result = []
for name, proxy in self._parser.items():
if name.startswith("auth:"):
result.append((name, proxy))
return result