Listen to pylint and mypy

This commit is contained in:
Joscha 2020-04-20 17:27:26 +00:00
parent ce77995c8f
commit 4ef0ffe3bf
9 changed files with 87 additions and 197 deletions

View File

@ -1,3 +1,5 @@
# pylint: disable=invalid-name
"""
This module exports only what you need for a basic configuration. If you want a
more complex configuration, you need to import the other submodules manually.

View File

@ -35,12 +35,12 @@ class CookieJar:
return
try:
LOGGER.info(f"Loading old cookies from {self._cookies.filename}")
LOGGER.info("Loading old cookies from %s", self._cookies.filename)
self._cookies.load(ignore_discard=True)
except (FileNotFoundError, LoadError):
LOGGER.warning(
f"No valid cookie file found at {self._cookies.filename}, "
"continuing with no cookies"
"No valid cookie file found at %s, continuing with no cookies",
self._cookies.filename
)
def save_cookies(self, reason: Optional[str] = None) -> None:
@ -51,7 +51,7 @@ class CookieJar:
if reason is None:
LOGGER.info("Saving cookies")
else:
LOGGER.info(f"Saving cookies ({reason})")
LOGGER.info("Saving cookies (%s)", reason)
# TODO figure out why ignore_discard is set
# TODO possibly catch a few more exceptions

View File

@ -13,6 +13,7 @@ from .tmp_dir import TmpDir
from .utils import stream_to_path
# pylint: disable=too-few-public-methods
class HttpDownloader():
"""A HTTP downloader that can handle HTTP basic auth."""

View File

@ -1,109 +0,0 @@
# ILIAS
import logging
import pathlib
import re
from .ilias_authenticators import ShibbolethAuthenticator
from .organizer import Organizer
from .utils import PrettyLogger
__all__ = ["Ilias"]
logger = logging.getLogger(__name__)
pretty = PrettyLogger(logger)
class Ilias:
FILE_RE = re.compile(r"goto\.php\?target=(file_\d+_download)")
DIR_RE = re.compile(r"ilias\.php\?ref_id=(\d+)")
def __init__(self, base_path, cookie_file):
self.base_path = base_path
self._auth = ShibbolethAuthenticator(base_path / cookie_file)
def synchronize(self, ref_id, to_dir, transform=lambda x: x, filter=lambda x: True):
pretty.starting_synchronizer(to_dir, "ILIAS", f"ref_id {ref_id}")
sync_path = pathlib.Path(self.base_path, to_dir)
orga = Organizer(self.base_path, sync_path)
orga.clean_temp_dir()
files = self._crawl(pathlib.PurePath(), f"fold_{ref_id}", filter)
self._download(orga, files, transform)
orga.clean_sync_dir()
orga.clean_temp_dir()
def _crawl(self, dir_path, dir_id, filter_):
soup = self._auth.get_webpage(dir_id)
found_files = []
files = self._find_files(soup)
for (name, file_id) in files:
path = dir_path / name
found_files.append((path, file_id))
logger.debug(f"Found file {path}")
dirs = self._find_dirs(soup)
for (name, ref_id) in dirs:
path = dir_path / name
logger.debug(f"Found dir {path}")
if filter_(path):
logger.info(f"Searching {path}")
files = self._crawl(path, ref_id, filter_)
found_files.extend(files)
else:
logger.info(f"Not searching {path}")
return found_files
def _download(self, orga, files, transform):
for (path, file_id) in sorted(files):
to_path = transform(path)
if to_path is not None:
temp_path = orga.temp_file()
self._auth.download_file(file_id, temp_path)
orga.add_file(temp_path, to_path)
def _find_files(self, soup):
files = []
file_names = set()
found = soup.find_all("a", {"class": "il_ContainerItemTitle", "href": self.FILE_RE})
for element in found:
file_stem = element.string.strip().replace("/", ".")
file_type = element.parent.parent.parent.find("div", {"class": "il_ItemProperties"}).find("span").string.strip()
file_id = re.search(self.FILE_RE, element.get("href")).group(1)
file_name = f"{file_stem}.{file_type}"
if file_name in file_names:
counter = 1
while True:
file_name = f"{file_stem} (duplicate {counter}).{file_type}"
if file_name in file_names:
counter += 1
else:
break
files.append((file_name, file_id))
file_names.add(file_name)
return files
def _find_dirs(self, soup):
dirs = []
found = soup.find_all("div", {"class": "alert", "role": "alert"})
if found:
return []
found = soup.find_all("a", {"class": "il_ContainerItemTitle", "href": self.DIR_RE})
for element in found:
dir_name = element.string.strip().replace("/", ".")
ref_id = re.search(self.DIR_RE, element.get("href")).group(1)
dir_id = f"fold_{ref_id}"
dirs.append((dir_name, dir_id))
return dirs

View File

@ -16,6 +16,8 @@ LOGGER = logging.getLogger(__name__)
class IliasAuthenticator(abc.ABC):
# pylint: disable=too-few-public-methods
"""
An authenticator that logs an existing requests session into an ILIAS
account.
@ -29,6 +31,8 @@ class IliasAuthenticator(abc.ABC):
class KitShibbolethAuthenticator(IliasAuthenticator):
# pylint: disable=too-few-public-methods
"""
Authenticate via KIT's shibboleth system.
"""

View File

@ -15,11 +15,8 @@ from .authenticators import IliasAuthenticator
class ContentTypeException(Exception):
"""Thrown when the content type of the ilias element can not be handled."""
def __init__(self, message: str):
"""Create a new exception."""
super().__init__(message)
# pylint: disable=too-few-public-methods
class IliasDownloader():
"""A downloader for ILIAS."""
@ -43,24 +40,22 @@ class IliasDownloader():
self._organizer.accept_file(tmp_file, target_path)
def _try_download(self, url: str, target_path: Path, params: Dict[str, Any]) -> bool:
with self._session.get(url, params=params, stream=True) as r:
content_type = r.headers["content-type"]
with self._session.get(url, params=params, stream=True) as response:
content_type = response.headers["content-type"]
if content_type.startswith("text/html"):
# Dangit, we're probably not logged in.
soup = soupify(r)
soup = soupify(response)
if self._is_logged_in(soup):
raise ContentTypeException(
"Attempting to download a web page, not a file"
)
raise ContentTypeException("Attempting to download a web page, not a file")
return False
else:
# Yay, we got the file :)
stream_to_path(r, target_path)
stream_to_path(response, target_path)
return True
def _is_logged_in(self, soup: Any) -> bool:
@staticmethod
def _is_logged_in(soup: bs4.BeautifulSoup) -> bool:
userlog = soup.find("li", {"id": "userlog"})
return userlog is not None

View File

@ -9,42 +9,27 @@ import shutil
from pathlib import Path
from typing import List, Set
from .utils import PrettyLogger, prompt_yes_no, resolve_path
from .utils import Location, PrettyLogger, prompt_yes_no
logger = logging.getLogger(__name__)
pretty_logger = PrettyLogger(logger)
LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER)
class FileAcceptException(Exception):
"""An exception while accepting a file."""
def __init__(self, message: str):
"""Create a new exception."""
super().__init__(message)
class Organizer():
class Organizer(Location):
"""A helper for managing downloaded files."""
def __init__(self, path: Path):
"""Create a new organizer for a given path."""
self._path = path
super().__init__(path)
self._known_files: Set[Path] = set()
# Keep the root dir
self.mark(path)
@property
def path(self) -> Path:
"""Return the path for this organizer."""
return self._path
def resolve(self, target_file: Path) -> Path:
"""Resolve a file relative to the path of this organizer.
Raises an exception if the file is outside the given directory.
"""
return resolve_path(self.path, target_file)
def accept_file(self, src: Path, dst: Path) -> None:
"""Move a file to this organizer and mark it."""
src_absolute = src.resolve()
@ -56,28 +41,27 @@ class Organizer():
if not src_absolute.is_file():
raise FileAcceptException("Source is a directory")
logger.debug(f"Copying '{src_absolute}' to '{dst_absolute}")
LOGGER.debug("Copying %s to %s", src_absolute, dst_absolute)
# Destination file is directory
if dst_absolute.exists() and dst_absolute.is_dir():
if prompt_yes_no(f"Overwrite folder {dst_absolute} with file?", default=False):
shutil.rmtree(dst_absolute)
else:
logger.warn(f"Could not add file {dst_absolute}")
LOGGER.warning("Could not add file %s", dst_absolute)
return
# Destination file exists
if dst_absolute.exists() and dst_absolute.is_file():
if filecmp.cmp(str(src_absolute), str(dst_absolute), shallow=False):
pretty_logger.ignored_file(dst_absolute)
# Bail out, nothing more to do
PRETTY.ignored_file(dst_absolute)
self.mark(dst)
return
PRETTY.modified_file(dst_absolute)
else:
pretty_logger.modified_file(dst_absolute)
else:
pretty_logger.new_file(dst_absolute)
PRETTY.new_file(dst_absolute)
# Create parent dir if needed
dst_parent_dir: Path = dst_absolute.parent
@ -92,11 +76,11 @@ class Organizer():
"""Mark a file as used so it will not get cleaned up."""
absolute_path = self.path.joinpath(path).resolve()
self._known_files.add(absolute_path)
logger.debug(f"Tracked {absolute_path}")
LOGGER.debug("Tracked %s", absolute_path)
def cleanup(self) -> None:
"""Remove all untracked files in the organizer's dir."""
logger.debug("Deleting all untracked files...")
LOGGER.debug("Deleting all untracked files...")
self._cleanup(self.path)
@ -116,7 +100,8 @@ class Organizer():
if start_dir.resolve() not in self._known_files and dir_empty:
start_dir.rmdir()
def _delete_file_if_confirmed(self, path: Path) -> None:
@staticmethod
def _delete_file_if_confirmed(path: Path) -> None:
prompt = f"Do you want to delete {path}"
if prompt_yes_no(prompt, False):

View File

@ -6,19 +6,19 @@ from pathlib import Path
from types import TracebackType
from typing import Optional, Type
from .utils import resolve_path
from .utils import Location
logger = logging.getLogger(__name__)
LOGGER = logging.getLogger(__name__)
class TmpDir():
class TmpDir(Location):
"""A temporary folder that can create files or nested temp folders."""
def __init__(self, path: Path):
"""Create a new temporary folder for the given path."""
super().__init__(path)
self._counter = 0
self._path = path
self._path.mkdir(parents=True, exist_ok=True)
self.path.mkdir(parents=True, exist_ok=True)
def __str__(self) -> str:
"""Format the folder as a string."""
@ -28,31 +28,22 @@ class TmpDir():
"""Context manager entry function."""
return self
def __exit__(self,
type: Optional[Type[BaseException]],
value: Optional[BaseException],
traceback: Optional[TracebackType]) -> Optional[bool]:
# pylint: disable=useless-return
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
"""Context manager exit function. Calls cleanup()."""
self.cleanup()
return None
@property
def path(self) -> Path:
"""Return the path of this folder."""
return self._path
def resolve(self, target_file: Path) -> Path:
"""Resolve a file relative to this folder.
Raises a [ResolveException] if the path is outside the folder.
"""
return resolve_path(self.path, target_file)
def new_file(self, prefix: Optional[str] = None) -> Path:
"""Return a unique path inside the folder, but don't create a file."""
name = f"{prefix if prefix else 'tmp'}-{self._inc_and_get_counter():03}"
logger.debug(f"Creating temp file '{name}'")
LOGGER.debug("Creating temp file %s", name)
return self.resolve(Path(name))
@ -63,13 +54,13 @@ class TmpDir():
sub_path = self.resolve(Path(name))
sub_path.mkdir(parents=True)
logger.debug(f"Creating temp dir '{name}' at {sub_path}")
LOGGER.debug("Creating temp dir %s at %s", name, sub_path)
return TmpDir(sub_path)
def cleanup(self) -> None:
"""Delete this folder and all contained files."""
logger.debug(f"Deleting temp folder {self.path}")
LOGGER.debug("Deleting temp folder %s", self.path)
shutil.rmtree(self.path.resolve())

View File

@ -32,6 +32,10 @@ def rename(path: PurePath, to_name: str) -> PurePath:
def soupify(response: requests.Response) -> bs4.BeautifulSoup:
"""
Wrap a requests response in a bs4 object.
"""
return bs4.BeautifulSoup(response.text, "html.parser")
@ -42,6 +46,7 @@ def stream_to_path(response: requests.Response, to_path: Path, chunk_size: int =
chunk_size is in bytes.
"""
with response:
with open(to_path, 'wb') as file_descriptor:
for chunk in response.iter_content(chunk_size=chunk_size):
file_descriptor.write(chunk)
@ -77,17 +82,33 @@ class ResolveException(Exception):
"""An exception while resolving a file."""
def resolve_path(directory: Path, target_file: Path) -> Path:
"""Resolve a file relative to the path of this organizer.
class Location:
"""
An object that has an inherent path.
"""
def __init__(self, path: Path):
self._path = path
@property
def path(self) -> Path:
"""
This object's location.
"""
return self._path
def resolve(self, target: Path) -> Path:
"""
Resolve a file relative to the path of this location.
Raises a [ResolveException] if the file is outside the given directory.
"""
absolute_path = directory.joinpath(target_file).resolve()
absolute_path = self.path.joinpath(target).resolve()
if directory not in absolute_path.parents:
raise ResolveException(
f"Path resolved to file outside folder ({absolute_path})"
)
# TODO Make this less inefficient
if self.path not in absolute_path.parents:
raise ResolveException(f"Path {target} is not inside directory {self.path}")
return absolute_path