Merge pull request #600 from Unrud/auth

Test and fix auth module. Configurable delay. Improve logging.
This commit is contained in:
Guillaume Ayoub 2017-05-23 12:22:16 +02:00 committed by GitHub
commit 1b5bfee96c
5 changed files with 99 additions and 39 deletions

3
config
View File

@ -78,6 +78,9 @@
# bcrypt and md5 require the passlib library to be installed.
#htpasswd_encryption = bcrypt
# Incorrect authentication delay (seconds)
#delay = 1
[rights]

View File

@ -34,11 +34,13 @@ import itertools
import os
import posixpath
import pprint
import random
import socket
import socketserver
import ssl
import sys
import threading
import time
import traceback
import wsgiref.simple_server
import zlib
@ -312,7 +314,7 @@ class Application:
status = "%i %s" % (
status, client.responses.get(status, "Unknown"))
self.logger.info(
"%s answer status for %s in %s sec: %s",
"%s answer status for %s in %.3f seconds: %s",
environ["REQUEST_METHOD"], environ["PATH_INFO"] + depthinfo,
(time_end - time_begin).total_seconds(), status)
start_response(status, list(headers.items()))
@ -375,13 +377,22 @@ class Application:
if path == "/.well-known" or path.startswith("/.well-known/"):
return response(*NOT_FOUND)
if user and not storage.is_safe_path_component(user):
if not user:
is_authenticated = True
elif not storage.is_safe_path_component(user):
# Prevent usernames like "user/calendar.ics"
self.logger.info("Refused unsafe username: %s", user)
is_authenticated = False
else:
is_authenticated = self.Auth.is_authenticated(user, password)
is_valid_user = is_authenticated or not user
if not is_authenticated:
self.logger.info("Failed login attempt: %s", user)
# Random delay to avoid timing oracles and bruteforce attacks
delay = self.configuration.getfloat("auth", "delay")
if delay > 0:
random_delay = delay * (0.5 + random.random())
self.logger.debug("Sleeping %.3f seconds", random_delay)
time.sleep(random_delay)
# Create principal collection
if user and is_authenticated:
@ -405,19 +416,22 @@ class Application:
"Request body too large: %d", content_length)
return response(*REQUEST_ENTITY_TOO_LARGE)
if is_valid_user:
if is_authenticated:
try:
status, headers, answer = function(
environ, base_prefix, path, user)
except socket.timeout:
return response(*REQUEST_TIMEOUT)
if (status, headers, answer) == NOT_ALLOWED:
self.logger.info("Access denied for %s",
"'%s'" % user if user else "anonymous user")
else:
status, headers, answer = NOT_ALLOWED
if (status, headers, answer) == NOT_ALLOWED and not (
user and is_authenticated):
# Unknown or unauthorized user
self.logger.info("%s refused" % (user or "Anonymous user"))
self.logger.debug("Asking client for authentication")
status = client.UNAUTHORIZED
realm = self.configuration.get("server", "realm")
headers = dict(headers)

View File

@ -56,9 +56,8 @@ following significantly more secure schemes are parsable by Radicale:
import base64
import functools
import hashlib
import hmac
import os
import random
import time
from importlib import import_module
@ -148,11 +147,12 @@ class Auth(BaseAuth):
def _plain(self, hash_value, password):
"""Check if ``hash_value`` and ``password`` match, plain method."""
return hash_value == password
return hmac.compare_digest(hash_value, password)
def _crypt(self, crypt, hash_value, password):
"""Check if ``hash_value`` and ``password`` match, crypt method."""
return crypt.crypt(password, hash_value) == hash_value
return hmac.compare_digest(crypt.crypt(password, hash_value),
hash_value)
def _sha1(self, hash_value, password):
"""Check if ``hash_value`` and ``password`` match, sha1 method."""
@ -160,7 +160,7 @@ class Auth(BaseAuth):
password = password.encode(self.configuration.get("encoding", "stock"))
sha1 = hashlib.sha1()
sha1.update(password)
return sha1.digest() == base64.b64decode(hash_value)
return hmac.compare_digest(sha1.digest(), base64.b64decode(hash_value))
def _ssha(self, hash_value, password):
"""Check if ``hash_value`` and ``password`` match, salted sha1 method.
@ -169,15 +169,15 @@ class Auth(BaseAuth):
written with e.g. openssl, and nginx can parse it.
"""
hash_value = hash_value.replace(
"{SSHA}", "").encode("ascii").decode("base64")
hash_value = base64.b64decode(hash_value.replace(
"{SSHA}", "").encode("ascii"))
password = password.encode(self.configuration.get("encoding", "stock"))
hash_value = hash_value[:20]
salt_value = hash_value[20:]
hash_value = hash_value[:20]
sha1 = hashlib.sha1()
sha1.update(password)
sha1.update(salt_value)
return sha1.digest() == hash_value
return hmac.compare_digest(sha1.digest(), hash_value)
def _bcrypt(self, bcrypt, hash_value, password):
return bcrypt.verify(password, hash_value)
@ -196,6 +196,4 @@ class Auth(BaseAuth):
login, hash_value = line.split(":")
if login == user and self.verify(hash_value, password):
return True
# Random timer to avoid timing oracles and simple bruteforce attacks
time.sleep(1 + random.random())
return False

View File

@ -93,7 +93,10 @@ INITIAL_CONFIG = OrderedDict([
"help": "htpasswd filename"}),
("htpasswd_encryption", {
"value": "bcrypt",
"help": "htpasswd encryption method"})])),
"help": "htpasswd encryption method"}),
("delay", {
"value": "1",
"help": "incorrect authentication delay"})])),
("rights", OrderedDict([
("type", {
"value": "owner_only",

View File

@ -21,15 +21,15 @@ Radicale tests with simple requests and authentication.
"""
import base64
import hashlib
import logging
import os
import shutil
import tempfile
import pytest
from radicale import Application, config
from . import BaseTest
from .test_base import BaseTest
class TestBaseAuthRequests(BaseTest):
@ -39,38 +39,80 @@ class TestBaseAuthRequests(BaseTest):
"""
def setup(self):
self.configuration = config.load()
self.logger = logging.getLogger("radicale_test")
self.colpath = tempfile.mkdtemp()
self.configuration.set("storage", "filesystem_folder", self.colpath)
# Disable syncing to disk for better performance
self.configuration.set("storage", "filesystem_fsync", "False")
# Required on Windows, doesn't matter on Unix
self.configuration.set("storage", "close_lock_file", "True")
# Set incorrect authentication delay to a very low value
self.configuration.set("auth", "delay", "0.002")
def teardown(self):
shutil.rmtree(self.colpath)
def test_root(self):
"""Htpasswd authentication."""
def _test_htpasswd(self, htpasswd_encryption, htpasswd_content):
"""Test htpasswd authentication with user "tmp" and password "bepo"."""
htpasswd_file_path = os.path.join(self.colpath, ".htpasswd")
with open(htpasswd_file_path, "wb") as fd:
fd.write(b"tmp:{SHA}" + base64.b64encode(
hashlib.sha1(b"bepo").digest()))
with open(htpasswd_file_path, "w") as f:
f.write(htpasswd_content)
self.configuration.set("auth", "type", "htpasswd")
self.configuration.set("auth", "htpasswd_filename", htpasswd_file_path)
self.configuration.set("auth", "htpasswd_encryption",
htpasswd_encryption)
self.application = Application(self.configuration, self.logger)
for user, password, expeced_status in (
("tmp", "bepo", 207), ("tmp", "tmp", 401), ("tmp", "", 401),
("unk", "unk", 401), ("unk", "", 401), ("", "", 401)):
status, headers, answer = self.request(
"PROPFIND", "/",
HTTP_AUTHORIZATION="Basic %s" % base64.b64encode(
("%s:%s" % (user, password)).encode()).decode())
assert status == expeced_status
configuration = config.load()
configuration.set("auth", "type", "htpasswd")
configuration.set("auth", "htpasswd_filename", htpasswd_file_path)
configuration.set("auth", "htpasswd_encryption", "sha1")
def test_htpasswd_plain(self):
self._test_htpasswd("plain", "tmp:bepo")
self.application = Application(
configuration, logging.getLogger("radicale_test"))
def test_htpasswd_sha1(self):
self._test_htpasswd("sha1", "tmp:{SHA}UWRS3uSJJq2itZQEUyIH8rRajCM=")
status, headers, answer = self.request(
"GET", "/", HTTP_AUTHORIZATION="dG1wOmJlcG8=")
assert status == 200
assert "Radicale works!" in answer
def test_htpasswd_ssha(self):
self._test_htpasswd("ssha", "tmp:{SSHA}qbD1diw9RJKi0DnW4qO8WX9SE18W")
def test_htpasswd_md5(self):
try:
import passlib # noqa: F401
except ImportError:
pytest.skip("passlib is not installed")
self._test_htpasswd("md5", "tmp:$apr1$BI7VKCZh$GKW4vq2hqDINMr8uv7lDY/")
def test_htpasswd_crypt(self):
try:
import crypt # noqa: F401
except ImportError:
pytest.skip("crypt is not installed")
self._test_htpasswd("crypt", "tmp:dxUqxoThMs04k")
def test_htpasswd_bcrypt(self):
try:
from passlib.hash import bcrypt
from passlib.exc import MissingBackendError
except ImportError:
pytest.skip("passlib is not installed")
try:
bcrypt.encrypt("test-bcrypt-backend")
except MissingBackendError:
pytest.skip("bcrypt backend for passlib is not installed")
self._test_htpasswd(
"bcrypt",
"tmp:$2y$05$oD7hbiQFQlvCM7zoalo/T.MssV3VNTRI3w5KDnj8NTUKJNWfVpvRq")
def test_custom(self):
"""Custom authentication."""
configuration = config.load()
configuration.set("auth", "type", "tests.custom.auth")
self.application = Application(
configuration, logging.getLogger("radicale_test"))
self.configuration.set("auth", "type", "tests.custom.auth")
self.application = Application(self.configuration, self.logger)
status, headers, answer = self.request(
"GET", "/", HTTP_AUTHORIZATION="dG1wOmJlcG8=")
assert status == 200