diff --git a/config b/config index e68699e..a0b04f9 100644 --- a/config +++ b/config @@ -78,6 +78,9 @@ # bcrypt and md5 require the passlib library to be installed. #htpasswd_encryption = bcrypt +# Incorrect authentication delay (seconds) +#delay = 1 + [rights] diff --git a/radicale/__init__.py b/radicale/__init__.py index d268ae0..b6da193 100644 --- a/radicale/__init__.py +++ b/radicale/__init__.py @@ -34,11 +34,13 @@ import itertools import os import posixpath import pprint +import random import socket import socketserver import ssl import sys import threading +import time import traceback import wsgiref.simple_server import zlib @@ -312,7 +314,7 @@ class Application: status = "%i %s" % ( status, client.responses.get(status, "Unknown")) self.logger.info( - "%s answer status for %s in %s sec: %s", + "%s answer status for %s in %.3f seconds: %s", environ["REQUEST_METHOD"], environ["PATH_INFO"] + depthinfo, (time_end - time_begin).total_seconds(), status) start_response(status, list(headers.items())) @@ -375,13 +377,22 @@ class Application: if path == "/.well-known" or path.startswith("/.well-known/"): return response(*NOT_FOUND) - if user and not storage.is_safe_path_component(user): + if not user: + is_authenticated = True + elif not storage.is_safe_path_component(user): # Prevent usernames like "user/calendar.ics" self.logger.info("Refused unsafe username: %s", user) is_authenticated = False else: is_authenticated = self.Auth.is_authenticated(user, password) - is_valid_user = is_authenticated or not user + if not is_authenticated: + self.logger.info("Failed login attempt: %s", user) + # Random delay to avoid timing oracles and bruteforce attacks + delay = self.configuration.getfloat("auth", "delay") + if delay > 0: + random_delay = delay * (0.5 + random.random()) + self.logger.debug("Sleeping %.3f seconds", random_delay) + time.sleep(random_delay) # Create principal collection if user and is_authenticated: @@ -405,19 +416,22 @@ class Application: "Request body too large: %d", content_length) return response(*REQUEST_ENTITY_TOO_LARGE) - if is_valid_user: + if is_authenticated: try: status, headers, answer = function( environ, base_prefix, path, user) except socket.timeout: return response(*REQUEST_TIMEOUT) + if (status, headers, answer) == NOT_ALLOWED: + self.logger.info("Access denied for %s", + "'%s'" % user if user else "anonymous user") else: status, headers, answer = NOT_ALLOWED if (status, headers, answer) == NOT_ALLOWED and not ( user and is_authenticated): # Unknown or unauthorized user - self.logger.info("%s refused" % (user or "Anonymous user")) + self.logger.debug("Asking client for authentication") status = client.UNAUTHORIZED realm = self.configuration.get("server", "realm") headers = dict(headers) diff --git a/radicale/auth.py b/radicale/auth.py index 8ae493a..0d5a925 100644 --- a/radicale/auth.py +++ b/radicale/auth.py @@ -56,9 +56,8 @@ following significantly more secure schemes are parsable by Radicale: import base64 import functools import hashlib +import hmac import os -import random -import time from importlib import import_module @@ -148,11 +147,12 @@ class Auth(BaseAuth): def _plain(self, hash_value, password): """Check if ``hash_value`` and ``password`` match, plain method.""" - return hash_value == password + return hmac.compare_digest(hash_value, password) def _crypt(self, crypt, hash_value, password): """Check if ``hash_value`` and ``password`` match, crypt method.""" - return crypt.crypt(password, hash_value) == hash_value + return hmac.compare_digest(crypt.crypt(password, hash_value), + hash_value) def _sha1(self, hash_value, password): """Check if ``hash_value`` and ``password`` match, sha1 method.""" @@ -160,7 +160,7 @@ class Auth(BaseAuth): password = password.encode(self.configuration.get("encoding", "stock")) sha1 = hashlib.sha1() sha1.update(password) - return sha1.digest() == base64.b64decode(hash_value) + return hmac.compare_digest(sha1.digest(), base64.b64decode(hash_value)) def _ssha(self, hash_value, password): """Check if ``hash_value`` and ``password`` match, salted sha1 method. @@ -169,15 +169,15 @@ class Auth(BaseAuth): written with e.g. openssl, and nginx can parse it. """ - hash_value = hash_value.replace( - "{SSHA}", "").encode("ascii").decode("base64") + hash_value = base64.b64decode(hash_value.replace( + "{SSHA}", "").encode("ascii")) password = password.encode(self.configuration.get("encoding", "stock")) - hash_value = hash_value[:20] salt_value = hash_value[20:] + hash_value = hash_value[:20] sha1 = hashlib.sha1() sha1.update(password) sha1.update(salt_value) - return sha1.digest() == hash_value + return hmac.compare_digest(sha1.digest(), hash_value) def _bcrypt(self, bcrypt, hash_value, password): return bcrypt.verify(password, hash_value) @@ -196,6 +196,4 @@ class Auth(BaseAuth): login, hash_value = line.split(":") if login == user and self.verify(hash_value, password): return True - # Random timer to avoid timing oracles and simple bruteforce attacks - time.sleep(1 + random.random()) return False diff --git a/radicale/config.py b/radicale/config.py index e37ee08..d7b4586 100644 --- a/radicale/config.py +++ b/radicale/config.py @@ -93,7 +93,10 @@ INITIAL_CONFIG = OrderedDict([ "help": "htpasswd filename"}), ("htpasswd_encryption", { "value": "bcrypt", - "help": "htpasswd encryption method"})])), + "help": "htpasswd encryption method"}), + ("delay", { + "value": "1", + "help": "incorrect authentication delay"})])), ("rights", OrderedDict([ ("type", { "value": "owner_only", diff --git a/radicale/tests/test_auth.py b/radicale/tests/test_auth.py index d25208d..5b80e86 100644 --- a/radicale/tests/test_auth.py +++ b/radicale/tests/test_auth.py @@ -21,15 +21,15 @@ Radicale tests with simple requests and authentication. """ import base64 -import hashlib import logging import os import shutil import tempfile +import pytest from radicale import Application, config -from . import BaseTest +from .test_base import BaseTest class TestBaseAuthRequests(BaseTest): @@ -39,38 +39,80 @@ class TestBaseAuthRequests(BaseTest): """ def setup(self): + self.configuration = config.load() + self.logger = logging.getLogger("radicale_test") self.colpath = tempfile.mkdtemp() + self.configuration.set("storage", "filesystem_folder", self.colpath) + # Disable syncing to disk for better performance + self.configuration.set("storage", "filesystem_fsync", "False") + # Required on Windows, doesn't matter on Unix + self.configuration.set("storage", "close_lock_file", "True") + # Set incorrect authentication delay to a very low value + self.configuration.set("auth", "delay", "0.002") def teardown(self): shutil.rmtree(self.colpath) - def test_root(self): - """Htpasswd authentication.""" + def _test_htpasswd(self, htpasswd_encryption, htpasswd_content): + """Test htpasswd authentication with user "tmp" and password "bepo".""" htpasswd_file_path = os.path.join(self.colpath, ".htpasswd") - with open(htpasswd_file_path, "wb") as fd: - fd.write(b"tmp:{SHA}" + base64.b64encode( - hashlib.sha1(b"bepo").digest())) + with open(htpasswd_file_path, "w") as f: + f.write(htpasswd_content) + self.configuration.set("auth", "type", "htpasswd") + self.configuration.set("auth", "htpasswd_filename", htpasswd_file_path) + self.configuration.set("auth", "htpasswd_encryption", + htpasswd_encryption) + self.application = Application(self.configuration, self.logger) + for user, password, expeced_status in ( + ("tmp", "bepo", 207), ("tmp", "tmp", 401), ("tmp", "", 401), + ("unk", "unk", 401), ("unk", "", 401), ("", "", 401)): + status, headers, answer = self.request( + "PROPFIND", "/", + HTTP_AUTHORIZATION="Basic %s" % base64.b64encode( + ("%s:%s" % (user, password)).encode()).decode()) + assert status == expeced_status - configuration = config.load() - configuration.set("auth", "type", "htpasswd") - configuration.set("auth", "htpasswd_filename", htpasswd_file_path) - configuration.set("auth", "htpasswd_encryption", "sha1") + def test_htpasswd_plain(self): + self._test_htpasswd("plain", "tmp:bepo") - self.application = Application( - configuration, logging.getLogger("radicale_test")) + def test_htpasswd_sha1(self): + self._test_htpasswd("sha1", "tmp:{SHA}UWRS3uSJJq2itZQEUyIH8rRajCM=") - status, headers, answer = self.request( - "GET", "/", HTTP_AUTHORIZATION="dG1wOmJlcG8=") - assert status == 200 - assert "Radicale works!" in answer + def test_htpasswd_ssha(self): + self._test_htpasswd("ssha", "tmp:{SSHA}qbD1diw9RJKi0DnW4qO8WX9SE18W") + + def test_htpasswd_md5(self): + try: + import passlib # noqa: F401 + except ImportError: + pytest.skip("passlib is not installed") + self._test_htpasswd("md5", "tmp:$apr1$BI7VKCZh$GKW4vq2hqDINMr8uv7lDY/") + + def test_htpasswd_crypt(self): + try: + import crypt # noqa: F401 + except ImportError: + pytest.skip("crypt is not installed") + self._test_htpasswd("crypt", "tmp:dxUqxoThMs04k") + + def test_htpasswd_bcrypt(self): + try: + from passlib.hash import bcrypt + from passlib.exc import MissingBackendError + except ImportError: + pytest.skip("passlib is not installed") + try: + bcrypt.encrypt("test-bcrypt-backend") + except MissingBackendError: + pytest.skip("bcrypt backend for passlib is not installed") + self._test_htpasswd( + "bcrypt", + "tmp:$2y$05$oD7hbiQFQlvCM7zoalo/T.MssV3VNTRI3w5KDnj8NTUKJNWfVpvRq") def test_custom(self): """Custom authentication.""" - configuration = config.load() - configuration.set("auth", "type", "tests.custom.auth") - self.application = Application( - configuration, logging.getLogger("radicale_test")) - + self.configuration.set("auth", "type", "tests.custom.auth") + self.application = Application(self.configuration, self.logger) status, headers, answer = self.request( "GET", "/", HTTP_AUTHORIZATION="dG1wOmJlcG8=") assert status == 200