2021-05-23 20:46:12 +02:00
|
|
|
import json
|
|
|
|
from pathlib import Path, PurePath
|
2021-10-30 18:09:05 +02:00
|
|
|
from typing import Any, Dict, List, Optional, Set
|
2021-05-23 20:46:12 +02:00
|
|
|
|
|
|
|
|
|
|
|
class ReportLoadError(Exception):
|
|
|
|
pass
|
2021-05-02 00:56:10 +02:00
|
|
|
|
|
|
|
|
2021-05-22 20:54:42 +02:00
|
|
|
class MarkDuplicateError(Exception):
|
2021-05-02 00:56:10 +02:00
|
|
|
"""
|
|
|
|
Tried to mark a file that was already marked.
|
|
|
|
"""
|
|
|
|
|
2021-05-22 20:54:42 +02:00
|
|
|
def __init__(self, path: PurePath):
|
|
|
|
super().__init__(f"A previous file already used path {path}")
|
|
|
|
self.path = path
|
2021-05-02 00:56:10 +02:00
|
|
|
|
|
|
|
|
2021-05-22 20:54:42 +02:00
|
|
|
class MarkConflictError(Exception):
|
2021-05-02 00:56:10 +02:00
|
|
|
"""
|
|
|
|
Marking the path would have caused a conflict.
|
|
|
|
|
|
|
|
A conflict can have two reasons: Either the new file has the same path as
|
|
|
|
the parent directory of a known file, or a parent directory of the new file
|
|
|
|
has the same path as a known file. In either case, adding the new file
|
|
|
|
would require a file and a directory to share the same path, which is
|
|
|
|
usually not possible.
|
|
|
|
"""
|
|
|
|
|
2021-05-22 20:54:42 +02:00
|
|
|
def __init__(self, path: PurePath, collides_with: PurePath):
|
|
|
|
super().__init__(f"File at {path} collides with previous file at {collides_with}")
|
|
|
|
self.path = path
|
|
|
|
self.collides_with = collides_with
|
2021-05-02 00:56:10 +02:00
|
|
|
|
|
|
|
|
2021-05-06 01:02:40 +02:00
|
|
|
# TODO Use PurePath.is_relative_to when updating to 3.9
|
|
|
|
def is_relative_to(a: PurePath, b: PurePath) -> bool:
|
|
|
|
try:
|
|
|
|
a.relative_to(b)
|
|
|
|
return True
|
|
|
|
except ValueError:
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
2021-05-02 00:56:10 +02:00
|
|
|
class Report:
|
|
|
|
"""
|
|
|
|
A report of a synchronization. Includes all files found by the crawler, as
|
|
|
|
well as the set of changes made to local files.
|
|
|
|
"""
|
|
|
|
|
2021-05-05 18:08:34 +02:00
|
|
|
def __init__(self) -> None:
|
2021-05-31 12:28:11 +02:00
|
|
|
# Paths found by the crawler, untransformed
|
|
|
|
self.found_paths: Set[PurePath] = set()
|
|
|
|
|
|
|
|
# Files reserved for metadata files (e. g. the report file or cookies)
|
|
|
|
# that can't be overwritten by user transforms and won't be cleaned up
|
|
|
|
# at the end.
|
2021-05-13 22:28:14 +02:00
|
|
|
self.reserved_files: Set[PurePath] = set()
|
2021-05-31 12:28:11 +02:00
|
|
|
|
|
|
|
# Files found by the crawler, transformed. Only includes files that
|
|
|
|
# were downloaded (or a download was attempted)
|
2021-05-02 00:56:10 +02:00
|
|
|
self.known_files: Set[PurePath] = set()
|
|
|
|
|
2021-05-23 20:46:12 +02:00
|
|
|
self.added_files: Set[PurePath] = set()
|
2021-05-02 00:56:10 +02:00
|
|
|
self.changed_files: Set[PurePath] = set()
|
|
|
|
self.deleted_files: Set[PurePath] = set()
|
2021-05-31 12:28:11 +02:00
|
|
|
# Files that should have been deleted by the cleanup but weren't
|
2021-05-26 10:58:19 +02:00
|
|
|
self.not_deleted_files: Set[PurePath] = set()
|
2021-11-07 21:40:22 +01:00
|
|
|
|
|
|
|
# Custom crawler-specific data
|
2021-10-30 18:09:05 +02:00
|
|
|
self.custom: Dict[str, Any] = dict()
|
2021-05-02 00:56:10 +02:00
|
|
|
|
2021-11-07 21:40:22 +01:00
|
|
|
# Encountered errors and warnings
|
|
|
|
self.encountered_warnings: List[str] = []
|
|
|
|
self.encountered_errors: List[str] = []
|
|
|
|
|
2021-05-23 20:46:12 +02:00
|
|
|
@staticmethod
|
|
|
|
def _get_list_of_strs(data: Dict[str, Any], key: str) -> List[str]:
|
|
|
|
result: Any = data.get(key, [])
|
|
|
|
|
|
|
|
if not isinstance(result, list):
|
|
|
|
raise ReportLoadError(f"Incorrect format: {key!r} is not a list")
|
|
|
|
|
|
|
|
for elem in result:
|
|
|
|
if not isinstance(elem, str):
|
|
|
|
raise ReportLoadError(f"Incorrect format: {key!r} must contain only strings")
|
|
|
|
|
|
|
|
return result
|
|
|
|
|
2021-10-30 18:09:05 +02:00
|
|
|
@staticmethod
|
|
|
|
def _get_str_dictionary(data: Dict[str, Any], key: str) -> Dict[str, Any]:
|
|
|
|
result: Dict[str, Any] = data.get(key, {})
|
|
|
|
|
|
|
|
if not isinstance(result, dict):
|
|
|
|
raise ReportLoadError(f"Incorrect format: {key!r} is not a dictionary")
|
|
|
|
|
|
|
|
return result
|
|
|
|
|
2021-05-23 20:46:12 +02:00
|
|
|
@classmethod
|
|
|
|
def load(cls, path: Path) -> "Report":
|
|
|
|
"""
|
2022-04-29 23:11:27 +02:00
|
|
|
May raise OSError, UnicodeDecodeError, JsonDecodeError, ReportLoadError.
|
2021-05-23 20:46:12 +02:00
|
|
|
"""
|
|
|
|
|
2022-04-29 23:11:27 +02:00
|
|
|
with open(path, encoding="utf-8") as f:
|
2021-05-23 20:46:12 +02:00
|
|
|
data = json.load(f)
|
|
|
|
|
|
|
|
if not isinstance(data, dict):
|
|
|
|
raise ReportLoadError("Incorrect format: Root is not an object")
|
|
|
|
|
|
|
|
self = cls()
|
2021-05-31 12:28:11 +02:00
|
|
|
for elem in self._get_list_of_strs(data, "found"):
|
|
|
|
self.found(PurePath(elem))
|
2021-05-23 20:46:12 +02:00
|
|
|
for elem in self._get_list_of_strs(data, "reserved"):
|
|
|
|
self.mark_reserved(PurePath(elem))
|
|
|
|
for elem in self._get_list_of_strs(data, "known"):
|
|
|
|
self.mark(PurePath(elem))
|
|
|
|
for elem in self._get_list_of_strs(data, "added"):
|
|
|
|
self.add_file(PurePath(elem))
|
|
|
|
for elem in self._get_list_of_strs(data, "changed"):
|
|
|
|
self.change_file(PurePath(elem))
|
|
|
|
for elem in self._get_list_of_strs(data, "deleted"):
|
|
|
|
self.delete_file(PurePath(elem))
|
2021-05-26 10:58:19 +02:00
|
|
|
for elem in self._get_list_of_strs(data, "not_deleted"):
|
|
|
|
self.not_delete_file(PurePath(elem))
|
2021-10-30 18:09:05 +02:00
|
|
|
self.custom = self._get_str_dictionary(data, "custom")
|
2021-11-07 21:40:22 +01:00
|
|
|
self.encountered_errors = self._get_list_of_strs(data, "encountered_errors")
|
|
|
|
self.encountered_warnings = self._get_list_of_strs(data, "encountered_warnings")
|
2021-05-23 20:46:12 +02:00
|
|
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
def store(self, path: Path) -> None:
|
|
|
|
"""
|
|
|
|
May raise OSError.
|
|
|
|
"""
|
|
|
|
|
|
|
|
data = {
|
2021-05-31 12:28:11 +02:00
|
|
|
"found": [str(path) for path in sorted(self.found_paths)],
|
2021-05-23 20:46:12 +02:00
|
|
|
"reserved": [str(path) for path in sorted(self.reserved_files)],
|
|
|
|
"known": [str(path) for path in sorted(self.known_files)],
|
|
|
|
"added": [str(path) for path in sorted(self.added_files)],
|
|
|
|
"changed": [str(path) for path in sorted(self.changed_files)],
|
|
|
|
"deleted": [str(path) for path in sorted(self.deleted_files)],
|
2021-05-26 10:58:19 +02:00
|
|
|
"not_deleted": [str(path) for path in sorted(self.not_deleted_files)],
|
2021-11-07 21:40:22 +01:00
|
|
|
"custom": self.custom,
|
|
|
|
"encountered_warnings": self.encountered_warnings,
|
|
|
|
"encountered_errors": self.encountered_errors,
|
2021-05-23 20:46:12 +02:00
|
|
|
}
|
|
|
|
|
2022-04-29 23:11:27 +02:00
|
|
|
with open(path, "w", encoding="utf-8") as f:
|
2021-05-23 20:46:12 +02:00
|
|
|
json.dump(data, f, indent=2, sort_keys=True)
|
|
|
|
f.write("\n") # json.dump doesn't do this
|
|
|
|
|
2021-05-31 12:28:11 +02:00
|
|
|
def found(self, path: PurePath) -> None:
|
|
|
|
self.found_paths.add(path)
|
|
|
|
|
2021-05-13 22:28:14 +02:00
|
|
|
def mark_reserved(self, path: PurePath) -> None:
|
2021-05-25 11:58:01 +02:00
|
|
|
if path in self.marked:
|
|
|
|
raise RuntimeError("Trying to reserve an already reserved file")
|
|
|
|
|
2021-05-13 22:28:14 +02:00
|
|
|
self.reserved_files.add(path)
|
|
|
|
|
2021-05-05 18:08:34 +02:00
|
|
|
def mark(self, path: PurePath) -> None:
|
2021-05-02 00:56:10 +02:00
|
|
|
"""
|
|
|
|
Mark a previously unknown file as known.
|
|
|
|
|
2021-05-22 20:54:42 +02:00
|
|
|
May throw a MarkDuplicateError or a MarkConflictError. For more detail,
|
|
|
|
see the respective exception's docstring.
|
2021-05-02 00:56:10 +02:00
|
|
|
"""
|
|
|
|
|
2021-05-15 22:25:41 +02:00
|
|
|
for other in self.marked:
|
2021-05-13 22:28:14 +02:00
|
|
|
if path == other:
|
2021-05-22 20:54:42 +02:00
|
|
|
raise MarkDuplicateError(path)
|
2021-05-02 00:56:10 +02:00
|
|
|
|
2021-05-13 22:28:14 +02:00
|
|
|
if is_relative_to(path, other) or is_relative_to(other, path):
|
2021-05-22 20:54:42 +02:00
|
|
|
raise MarkConflictError(path, other)
|
2021-05-02 00:56:10 +02:00
|
|
|
|
|
|
|
self.known_files.add(path)
|
|
|
|
|
2021-05-15 22:25:41 +02:00
|
|
|
@property
|
|
|
|
def marked(self) -> Set[PurePath]:
|
|
|
|
return self.known_files | self.reserved_files
|
|
|
|
|
|
|
|
def is_marked(self, path: PurePath) -> bool:
|
|
|
|
return path in self.marked
|
2021-05-05 18:08:34 +02:00
|
|
|
|
|
|
|
def add_file(self, path: PurePath) -> None:
|
2021-05-02 00:56:10 +02:00
|
|
|
"""
|
|
|
|
Unlike mark(), this function accepts any paths.
|
|
|
|
"""
|
|
|
|
|
2021-05-23 20:46:12 +02:00
|
|
|
self.added_files.add(path)
|
2021-05-02 00:56:10 +02:00
|
|
|
|
2021-05-05 18:08:34 +02:00
|
|
|
def change_file(self, path: PurePath) -> None:
|
2021-05-02 00:56:10 +02:00
|
|
|
"""
|
|
|
|
Unlike mark(), this function accepts any paths.
|
|
|
|
"""
|
|
|
|
|
|
|
|
self.changed_files.add(path)
|
|
|
|
|
2021-05-05 18:08:34 +02:00
|
|
|
def delete_file(self, path: PurePath) -> None:
|
2021-05-02 00:56:10 +02:00
|
|
|
"""
|
|
|
|
Unlike mark(), this function accepts any paths.
|
|
|
|
"""
|
|
|
|
|
|
|
|
self.deleted_files.add(path)
|
2021-05-26 10:58:19 +02:00
|
|
|
|
|
|
|
def not_delete_file(self, path: PurePath) -> None:
|
|
|
|
"""
|
|
|
|
Unlike mark(), this function accepts any paths.
|
|
|
|
"""
|
|
|
|
|
|
|
|
self.not_deleted_files.add(path)
|
2021-10-30 18:09:05 +02:00
|
|
|
|
|
|
|
def add_custom_value(self, key: str, value: Any) -> None:
|
|
|
|
"""
|
|
|
|
Adds a custom value under the passed key, overwriting any existing
|
|
|
|
"""
|
|
|
|
self.custom[key] = value
|
|
|
|
|
|
|
|
def get_custom_value(self, key: str) -> Optional[Any]:
|
|
|
|
"""
|
|
|
|
Retrieves a custom value for the given key.
|
|
|
|
"""
|
|
|
|
return self.custom.get(key)
|
2021-11-07 21:40:22 +01:00
|
|
|
|
|
|
|
def add_error(self, error: str) -> None:
|
|
|
|
"""
|
|
|
|
Adds an error to this report's error list.
|
|
|
|
"""
|
|
|
|
self.encountered_errors.append(error)
|
|
|
|
|
|
|
|
def add_warning(self, warning: str) -> None:
|
|
|
|
"""
|
|
|
|
Adds a warning to this report's warning list.
|
|
|
|
"""
|
|
|
|
self.encountered_warnings.append(warning)
|