Prefix internal attributes with underscore

This commit is contained in:
Unrud 2020-01-19 18:01:13 +01:00
parent 4f6a342211
commit e07df9fd1d
12 changed files with 65 additions and 66 deletions

View File

@ -96,7 +96,7 @@ class Application(
return request_environ
def decode(self, text, environ):
def _decode(self, text, environ):
"""Try to magically decode ``text`` according to given ``environ``."""
# List of charsets to try
charsets = []
@ -238,7 +238,7 @@ class Application(
login, password = login or "", password or ""
elif authorization.startswith("Basic"):
authorization = authorization[len("Basic"):].strip()
login, password = self.decode(base64.b64decode(
login, password = self._decode(base64.b64decode(
authorization.encode("ascii")), environ).split(":", 1)
user = self._auth.login(login, password) or "" if login else ""
@ -312,7 +312,7 @@ class Application(
return response(status, headers, answer)
def access(self, user, path, permission, item=None):
def _access(self, user, path, permission, item=None):
if permission not in "rw":
raise ValueError("Invalid permission argument: %r" % permission)
if not item:
@ -336,7 +336,7 @@ class Application(
return True
return False
def read_raw_content(self, environ):
def _read_raw_content(self, environ):
content_length = int(environ.get("CONTENT_LENGTH") or 0)
if not content_length:
return b""
@ -345,13 +345,13 @@ class Application(
raise RuntimeError("Request body too short: %d" % len(content))
return content
def read_content(self, environ):
content = self.decode(self.read_raw_content(environ), environ)
def _read_content(self, environ):
content = self._decode(self._read_raw_content(environ), environ)
logger.debug("Request content:\n%s", content)
return content
def read_xml_content(self, environ):
content = self.decode(self.read_raw_content(environ), environ)
def _read_xml_content(self, environ):
content = self._decode(self._read_raw_content(environ), environ)
if not content:
return None
try:
@ -364,7 +364,7 @@ class Application(
xmlutils.pretty_xml(xml_content))
return xml_content
def write_xml_content(self, xml_content):
def _write_xml_content(self, xml_content):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Response content:\n%s",
xmlutils.pretty_xml(xml_content))
@ -373,10 +373,10 @@ class Application(
xml_declaration=True)
return f.getvalue()
def webdav_error_response(self, namespace, name,
status=httputils.WEBDAV_PRECONDITION_FAILED[0]):
def _webdav_error_response(self, namespace, name,
status=httputils.WEBDAV_PRECONDITION_FAILED[0]):
"""Generate XML error response."""
headers = {"Content-Type": "text/xml; charset=%s" % self._encoding}
content = self.write_xml_content(
content = self._write_xml_content(
xmlutils.webdav_error(namespace, name))
return status, headers, content

View File

@ -49,13 +49,13 @@ def xml_delete(base_prefix, path, collection, href=None):
class ApplicationDeleteMixin:
def do_DELETE(self, environ, base_prefix, path, user):
"""Manage DELETE request."""
if not self.access(user, path, "w"):
if not self._access(user, path, "w"):
return httputils.NOT_ALLOWED
with self._storage.acquire_lock("w", user):
item = next(self._storage.discover(path), None)
if not item:
return httputils.NOT_FOUND
if not self.access(user, path, "w", item):
if not self._access(user, path, "w", item):
return httputils.NOT_ALLOWED
if_match = environ.get("HTTP_IF_MATCH", "*")
if if_match not in ("*", item.etag):
@ -67,4 +67,4 @@ class ApplicationDeleteMixin:
xml_answer = xml_delete(
base_prefix, path, item.collection, item.href)
headers = {"Content-Type": "text/xml; charset=%s" % self._encoding}
return client.OK, headers, self.write_xml_content(xml_answer)
return client.OK, headers, self._write_xml_content(xml_answer)

View File

@ -70,13 +70,13 @@ class ApplicationGetMixin:
# Dispatch .web URL to web module
if path == "/.web" or path.startswith("/.web/"):
return self._web.get(environ, base_prefix, path, user)
if not self.access(user, path, "r"):
if not self._access(user, path, "r"):
return httputils.NOT_ALLOWED
with self._storage.acquire_lock("r", user):
item = next(self._storage.discover(path), None)
if not item:
return httputils.NOT_FOUND
if not self.access(user, path, "r", item):
if not self._access(user, path, "r", item):
return httputils.NOT_ALLOWED
if isinstance(item, storage.BaseCollection):
tag = item.get_meta("tag")

View File

@ -33,7 +33,7 @@ class ApplicationMkcalendarMixin:
if not self._rights.authorized(user, path, "w"):
return httputils.NOT_ALLOWED
try:
xml_content = self.read_xml_content(environ)
xml_content = self._read_xml_content(environ)
except RuntimeError as e:
logger.warning(
"Bad MKCALENDAR request on %r: %s", path, e, exc_info=True)
@ -54,7 +54,7 @@ class ApplicationMkcalendarMixin:
with self._storage.acquire_lock("w", user):
item = next(self._storage.discover(path), None)
if item:
return self.webdav_error_response(
return self._webdav_error_response(
"D", "resource-must-be-null")
parent_path = pathutils.unstrip_path(
posixpath.dirname(pathutils.strip_path(path)), True)

View File

@ -34,7 +34,7 @@ class ApplicationMkcolMixin:
if not permissions:
return httputils.NOT_ALLOWED
try:
xml_content = self.read_xml_content(environ)
xml_content = self._read_xml_content(environ)
except RuntimeError as e:
logger.warning(
"Bad MKCOL request on %r: %s", path, e, exc_info=True)

View File

@ -34,7 +34,7 @@ class ApplicationMoveMixin:
logger.info("Unsupported destination address: %r", raw_dest)
# Remote destination server, not supported
return httputils.REMOTE_DESTINATION
if not self.access(user, path, "w"):
if not self._access(user, path, "w"):
return httputils.NOT_ALLOWED
to_path = pathutils.sanitize_path(to_url.path)
if not (to_path + "/").startswith(base_prefix + "/"):
@ -42,15 +42,15 @@ class ApplicationMoveMixin:
"start with base prefix", to_path, path)
return httputils.NOT_ALLOWED
to_path = to_path[len(base_prefix):]
if not self.access(user, to_path, "w"):
if not self._access(user, to_path, "w"):
return httputils.NOT_ALLOWED
with self._storage.acquire_lock("w", user):
item = next(self._storage.discover(path), None)
if not item:
return httputils.NOT_FOUND
if (not self.access(user, path, "w", item) or
not self.access(user, to_path, "w", item)):
if (not self._access(user, path, "w", item) or
not self._access(user, to_path, "w", item)):
return httputils.NOT_ALLOWED
if isinstance(item, storage.BaseCollection):
# TODO: support moving collections
@ -74,7 +74,7 @@ class ApplicationMoveMixin:
not to_item and
to_collection.path != item.collection.path and
to_collection.has_uid(item.uid)):
return self.webdav_error_response(
return self._webdav_error_response(
"C" if tag == "VCALENDAR" else "CR", "no-uid-conflict")
to_href = posixpath.basename(pathutils.strip_path(to_path))
try:

View File

@ -357,10 +357,10 @@ class ApplicationPropfindMixin:
def do_PROPFIND(self, environ, base_prefix, path, user):
"""Manage PROPFIND request."""
if not self.access(user, path, "r"):
if not self._access(user, path, "r"):
return httputils.NOT_ALLOWED
try:
xml_content = self.read_xml_content(environ)
xml_content = self._read_xml_content(environ)
except RuntimeError as e:
logger.warning(
"Bad PROPFIND request on %r: %s", path, e, exc_info=True)
@ -375,7 +375,7 @@ class ApplicationPropfindMixin:
item = next(items, None)
if not item:
return httputils.NOT_FOUND
if not self.access(user, path, "r", item):
if not self._access(user, path, "r", item):
return httputils.NOT_ALLOWED
# put item back
items = itertools.chain([item], items)
@ -387,4 +387,4 @@ class ApplicationPropfindMixin:
self._encoding)
if status == client.FORBIDDEN:
return httputils.NOT_ALLOWED
return status, headers, self.write_xml_content(xml_answer)
return status, headers, self._write_xml_content(xml_answer)

View File

@ -87,10 +87,10 @@ def xml_proppatch(base_prefix, path, xml_request, collection):
class ApplicationProppatchMixin:
def do_PROPPATCH(self, environ, base_prefix, path, user):
"""Manage PROPPATCH request."""
if not self.access(user, path, "w"):
if not self._access(user, path, "w"):
return httputils.NOT_ALLOWED
try:
xml_content = self.read_xml_content(environ)
xml_content = self._read_xml_content(environ)
except RuntimeError as e:
logger.warning(
"Bad PROPPATCH request on %r: %s", path, e, exc_info=True)
@ -102,7 +102,7 @@ class ApplicationProppatchMixin:
item = next(self._storage.discover(path), None)
if not item:
return httputils.NOT_FOUND
if not self.access(user, path, "w", item):
if not self._access(user, path, "w", item):
return httputils.NOT_ALLOWED
if not isinstance(item, storage.BaseCollection):
return httputils.FORBIDDEN
@ -116,4 +116,4 @@ class ApplicationProppatchMixin:
"Bad PROPPATCH request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
return (client.MULTI_STATUS, headers,
self.write_xml_content(xml_answer))
self._write_xml_content(xml_answer))

View File

@ -114,10 +114,10 @@ def prepare(vobject_items, path, content_type, permissions, parent_permissions,
class ApplicationPutMixin:
def do_PUT(self, environ, base_prefix, path, user):
"""Manage PUT request."""
if not self.access(user, path, "w"):
if not self._access(user, path, "w"):
return httputils.NOT_ALLOWED
try:
content = self.read_content(environ)
content = self._read_content(environ)
except RuntimeError as e:
logger.warning("Bad PUT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
@ -201,7 +201,7 @@ class ApplicationPutMixin:
prepared_item, = prepared_items
if (item and item.uid != prepared_item.uid or
not item and parent_item.has_uid(prepared_item.uid)):
return self.webdav_error_response(
return self._webdav_error_response(
"C" if tag == "VCALENDAR" else "CR",
"no-uid-conflict")

View File

@ -257,10 +257,10 @@ def xml_item_response(base_prefix, href, found_props=(), not_found_props=(),
class ApplicationReportMixin:
def do_REPORT(self, environ, base_prefix, path, user):
"""Manage REPORT request."""
if not self.access(user, path, "r"):
if not self._access(user, path, "r"):
return httputils.NOT_ALLOWED
try:
xml_content = self.read_xml_content(environ)
xml_content = self._read_xml_content(environ)
except RuntimeError as e:
logger.warning(
"Bad REPORT request on %r: %s", path, e, exc_info=True)
@ -273,7 +273,7 @@ class ApplicationReportMixin:
item = next(self._storage.discover(path), None)
if not item:
return httputils.NOT_FOUND
if not self.access(user, path, "r", item):
if not self._access(user, path, "r", item):
return httputils.NOT_ALLOWED
if isinstance(item, storage.BaseCollection):
collection = item
@ -288,4 +288,4 @@ class ApplicationReportMixin:
logger.warning(
"Bad REPORT request on %r: %s", path, e, exc_info=True)
return httputils.BAD_REQUEST
return (status, headers, self.write_xml_content(xml_answer))
return (status, headers, self._write_xml_content(xml_answer))

View File

@ -63,24 +63,24 @@ from radicale import auth
class Auth(auth.BaseAuth):
def __init__(self, configuration):
super().__init__(configuration)
self.filename = configuration.get("auth", "htpasswd_filename")
self.encryption = configuration.get("auth", "htpasswd_encryption")
self._filename = configuration.get("auth", "htpasswd_filename")
encryption = configuration.get("auth", "htpasswd_encryption")
if self.encryption == "ssha":
self.verify = self._ssha
elif self.encryption == "sha1":
self.verify = self._sha1
elif self.encryption == "plain":
self.verify = self._plain
elif self.encryption == "md5":
if encryption == "ssha":
self._verify = self._ssha
elif encryption == "sha1":
self._verify = self._sha1
elif encryption == "plain":
self._verify = self._plain
elif encryption == "md5":
try:
from passlib.hash import apr_md5_crypt
except ImportError as e:
raise RuntimeError(
"The htpasswd encryption method 'md5' requires "
"the passlib module.") from e
self.verify = functools.partial(self._md5apr1, apr_md5_crypt)
elif self.encryption == "bcrypt":
self._verify = functools.partial(self._md5apr1, apr_md5_crypt)
elif encryption == "bcrypt":
try:
from passlib.hash import bcrypt
except ImportError as e:
@ -91,19 +91,18 @@ class Auth(auth.BaseAuth):
# good error message if bcrypt backend is not available. Trigger
# this here.
bcrypt.hash("test-bcrypt-backend")
self.verify = functools.partial(self._bcrypt, bcrypt)
elif self.encryption == "crypt":
self._verify = functools.partial(self._bcrypt, bcrypt)
elif encryption == "crypt":
try:
import crypt
except ImportError as e:
raise RuntimeError(
"The htpasswd encryption method 'crypt' requires "
"the crypt() system support.") from e
self.verify = functools.partial(self._crypt, crypt)
self._verify = functools.partial(self._crypt, crypt)
else:
raise RuntimeError(
"The htpasswd encryption method %r is not "
"supported." % self.encryption)
raise RuntimeError("The htpasswd encryption method %r is not "
"supported." % encryption)
def _plain(self, hash_value, password):
"""Check if ``hash_value`` and ``password`` match, plain method."""
@ -162,7 +161,7 @@ class Auth(auth.BaseAuth):
"""
try:
with open(self.filename) as f:
with open(self._filename) as f:
for line in f:
line = line.rstrip("\n")
if line.lstrip() and not line.lstrip().startswith("#"):
@ -172,13 +171,13 @@ class Auth(auth.BaseAuth):
# Always compare both login and password to avoid
# timing attacks, see #591.
login_ok = hmac.compare_digest(hash_login, login)
password_ok = self.verify(hash_value, password)
password_ok = self._verify(hash_value, password)
if login_ok and password_ok:
return login
except ValueError as e:
raise RuntimeError("Invalid htpasswd file %r: %s" %
(self.filename, e)) from e
(self._filename, e)) from e
except OSError as e:
raise RuntimeError("Failed to load htpasswd file %r: %s" %
(self.filename, e)) from e
(self._filename, e)) from e
return ""

View File

@ -43,7 +43,7 @@ from radicale.log import logger
class Rights(rights.BaseRights):
def __init__(self, configuration):
super().__init__(configuration)
self.filename = configuration.get("rights", "file")
self._filename = configuration.get("rights", "file")
def authorized(self, user, path, permissions):
user = user or ""
@ -54,12 +54,12 @@ class Rights(rights.BaseRights):
rights_config = configparser.ConfigParser(
{"login": user_escaped, "path": sane_path_escaped})
try:
if not rights_config.read(self.filename):
if not rights_config.read(self._filename):
raise RuntimeError("No such file: %r" %
self.filename)
self._filename)
except Exception as e:
raise RuntimeError("Failed to load rights file %r: %s" %
(self.filename, e)) from e
(self._filename, e)) from e
for section in rights_config.sections():
try:
user_pattern = rights_config.get(section, "user")
@ -70,7 +70,7 @@ class Rights(rights.BaseRights):
*map(re.escape, user_match.groups())), sane_path)
except Exception as e:
raise RuntimeError("Error in section %r of rights file %r: "
"%s" % (section, self.filename, e)) from e
"%s" % (section, self._filename, e)) from e
if user_match and collection_match:
logger.debug("Rule %r:%r matches %r:%r from section %r",
user, sane_path, user_pattern,