Clean rights from file and remove tests

This commit is contained in:
Guillaume Ayoub 2012-09-15 09:08:01 +02:00
parent 7b15832dbf
commit 4c064bcf35
7 changed files with 123 additions and 387 deletions

View File

@ -51,16 +51,12 @@ from . import auth, config, ical, log, rights, storage, xmlutils
VERSION = "git" VERSION = "git"
# Standard "not allowed" response that is returned when an authenticated # Standard "not allowed" response that is returned when an authenticated user
# user tries to access information they don't have rights to. # tries to access information they don't have rights to
NOT_ALLOWED = ( NOT_ALLOWED = (client.FORBIDDEN, {}, None)
client.FORBIDDEN,
{},
None)
# Standard "authenticate" response that is returned when a # Standard "authenticate" response that is returned when a user tries to access
# user tries to access non-public information w/o submitting # non-public information w/o submitting proper authentication credentials
# proper authentication credentials
WRONG_CREDENTIALS = ( WRONG_CREDENTIALS = (
client.UNAUTHORIZED, client.UNAUTHORIZED,
{"WWW-Authenticate": "Basic realm=\"Radicale - Password Required\""}, {"WWW-Authenticate": "Basic realm=\"Radicale - Password Required\""},
@ -185,58 +181,64 @@ class Application(object):
trailing_slash = "" if uri == "/" else trailing_slash trailing_slash = "" if uri == "/" else trailing_slash
return uri + trailing_slash return uri + trailing_slash
def collect_allowed_items(self, items, user): def collect_allowed_items(self, items, user):
""" Collect those items from the request that the user """Get items from request that user is allowed to access."""
is actually allowed to access """
read_last_collection_allowed = None read_last_collection_allowed = None
write_last_collection_allowed = None write_last_collection_allowed = None
read_allowed_items = [] read_allowed_items = []
write_allowed_items = [] write_allowed_items = []
for item in items: for item in items:
if isinstance(item, ical.Collection): if isinstance(item, ical.Collection):
if rights.read_authorized(user, item): if rights.read_authorized(user, item):
log.LOGGER.debug("%s has read access to collection %s" % (user, item.url or "/")) log.LOGGER.debug(
"%s has read access to collection %s" %
(user or "Anonymous", item.url or "/"))
read_last_collection_allowed = True read_last_collection_allowed = True
read_allowed_items.append(item) read_allowed_items.append(item)
else: else:
log.LOGGER.debug("%s has NO read access to collection %s" % (user, item.url or "/")) log.LOGGER.debug(
"%s has NO read access to collection %s" %
(user or "Anonymous", item.url or "/"))
read_last_collection_allowed = False read_last_collection_allowed = False
if rights.write_authorized(user, item): if rights.write_authorized(user, item):
log.LOGGER.debug("%s has write access to collection %s" % (user, item.url or "/")) log.LOGGER.debug(
"%s has write access to collection %s" %
(user or "Anonymous", item.url or "/"))
write_last_collection_allowed = True write_last_collection_allowed = True
write_allowed_items.append(item) write_allowed_items.append(item)
else: else:
log.LOGGER.debug("%s has NO write access to collection %s" % (user, item.url or "/")) log.LOGGER.debug(
"%s has NO write access to collection %s" %
(user or "Anonymous", item.url or "/"))
write_last_collection_allowed = False write_last_collection_allowed = False
else:
# item is not a collection, it's the child of the last # item is not a collection, it's the child of the last
# collection we've met in the loop. Only add this item # collection we've met in the loop. Only add this item
# if this last collection was allowed. # if this last collection was allowed.
else:
if read_last_collection_allowed: if read_last_collection_allowed:
log.LOGGER.debug("%s has read access to item %s" % (user, item.name or "/")) log.LOGGER.debug(
"%s has read access to item %s" %
(user or "Anonymous", item.name))
read_allowed_items.append(item) read_allowed_items.append(item)
else:
log.LOGGER.debug(
"%s has NO read access to item %s" %
(user or "Anonymous", item.name))
if write_last_collection_allowed: if write_last_collection_allowed:
log.LOGGER.debug("%s has write access to item %s" % (user, item.name or "/")) log.LOGGER.debug(
"%s has write access to item %s" %
(user or "Anonymous", item.name))
write_allowed_items.append(item) write_allowed_items.append(item)
else:
if (not write_last_collection_allowed) and (not read_last_collection_allowed): log.LOGGER.debug(
log.LOGGER.info("%s has NO access to item %s" % (user, item.name or "/")) "%s has NO write access to item %s" %
(user or "Anonymous", item.name))
return read_allowed_items, write_allowed_items return read_allowed_items, write_allowed_items
def _union(self, list1, list2):
out = []
out.extend(list1)
for thing in list2:
if not thing in list1:
list1.append(thing)
return out
def __call__(self, environ, start_response): def __call__(self, environ, start_response):
"""Manage a request.""" """Manage a request."""
log.LOGGER.info("%s request at %s received" % ( log.LOGGER.info("%s request at %s received" % (
@ -280,14 +282,17 @@ class Application(object):
if not items or function == self.options or \ if not items or function == self.options or \
auth.is_authenticated(user, password): auth.is_authenticated(user, password):
read_allowed_items, write_allowed_items = self.collect_allowed_items(items, user) read_allowed_items, write_allowed_items = \
self.collect_allowed_items(items, user)
if read_allowed_items or write_allowed_items or function == self.options: if read_allowed_items or write_allowed_items or \
function == self.options:
# Collections found # Collections found
status, headers, answer = function( status, headers, answer = function(
environ, read_allowed_items, write_allowed_items, content, user) environ, read_allowed_items, write_allowed_items, content,
user)
else: else:
# Good user but has no rights to any of the given collections # Good user but has no rights to any of the given collections
status, headers, answer = NOT_ALLOWED status, headers, answer = NOT_ALLOWED
else: else:
# Unknown or unauthorized user # Unknown or unauthorized user
@ -312,11 +317,12 @@ class Application(object):
# All these functions must have the same parameters, some are useless # All these functions must have the same parameters, some are useless
# pylint: disable=W0612,W0613,R0201 # pylint: disable=W0612,W0613,R0201
def delete(self, environ, read_collections, write_collections, content, user): def delete(self, environ, read_collections, write_collections, content,
user):
"""Manage DELETE request.""" """Manage DELETE request."""
if not len(write_collections): if not len(write_collections):
return client.PRECONDITION_FAILED, {}, None return client.PRECONDITION_FAILED, {}, None
collection = write_collections[0] collection = write_collections[0]
if collection.path == environ["PATH_INFO"].strip("/"): if collection.path == environ["PATH_INFO"].strip("/"):
@ -354,9 +360,9 @@ class Application(object):
if not len(read_collections): if not len(read_collections):
return NOT_ALLOWED return NOT_ALLOWED
collection = read_collections[0] collection = read_collections[0]
item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection) item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
if item_name: if item_name:
@ -374,10 +380,13 @@ class Application(object):
# Create the collection if it does not exist # Create the collection if it does not exist
if not collection.exists: if not collection.exists:
if collection in write_collections: if collection in write_collections:
log.LOGGER.debug("Creating collection %s" % collection.name) log.LOGGER.debug(
"Creating collection %s" % collection.name)
collection.write() collection.write()
else: else:
log.LOGGER.debug("Collection %s not available and could not be created due to missing write rights" % collection.name) log.LOGGER.debug(
"Collection %s not available and could not be created "
"due to missing write rights" % collection.name)
return NOT_ALLOWED return NOT_ALLOWED
# Get whole collection # Get whole collection
@ -391,18 +400,21 @@ class Application(object):
answer = answer_text.encode(self.encoding) answer = answer_text.encode(self.encoding)
return client.OK, headers, answer return client.OK, headers, answer
def head(self, environ, read_collections, write_collections, content, user): def head(self, environ, read_collections, write_collections, content,
user):
"""Manage HEAD request.""" """Manage HEAD request."""
status, headers, answer = self.get(environ, read_collections, write_collections, content, user) status, headers, answer = self.get(
environ, read_collections, write_collections, content, user)
return status, headers, None return status, headers, None
def mkcalendar(self, environ, read_collections, write_collections, content, user): def mkcalendar(self, environ, read_collections, write_collections, content,
user):
"""Manage MKCALENDAR request.""" """Manage MKCALENDAR request."""
if not len(write_collections): if not len(write_collections):
return NOT_ALLOWED return NOT_ALLOWED
collection = write_collections[0] collection = write_collections[0]
props = xmlutils.props_from_request(content) props = xmlutils.props_from_request(content)
timezone = props.get("C:calendar-timezone") timezone = props.get("C:calendar-timezone")
if timezone: if timezone:
@ -414,13 +426,14 @@ class Application(object):
collection.write() collection.write()
return client.CREATED, {}, None return client.CREATED, {}, None
def mkcol(self, environ, read_collections, write_collections, content, user): def mkcol(self, environ, read_collections, write_collections, content,
user):
"""Manage MKCOL request.""" """Manage MKCOL request."""
if not len(write_collections): if not len(write_collections):
return NOT_ALLOWED return NOT_ALLOWED
collection = write_collections[0] collection = write_collections[0]
props = xmlutils.props_from_request(content) props = xmlutils.props_from_request(content)
with collection.props as collection_props: with collection.props as collection_props:
for key, value in props.items(): for key, value in props.items():
@ -428,13 +441,14 @@ class Application(object):
collection.write() collection.write()
return client.CREATED, {}, None return client.CREATED, {}, None
def move(self, environ, read_collections, write_collections, content, user): def move(self, environ, read_collections, write_collections, content,
user):
"""Manage MOVE request.""" """Manage MOVE request."""
if not len(write_collections): if not len(write_collections):
return NOT_ALLOWED return NOT_ALLOWED
from_collection = write_collections[0] from_collection = write_collections[0]
from_name = xmlutils.name_from_path( from_name = xmlutils.name_from_path(
environ["PATH_INFO"], from_collection) environ["PATH_INFO"], from_collection)
if from_name: if from_name:
@ -463,7 +477,8 @@ class Application(object):
# Moving collections, not supported # Moving collections, not supported
return client.FORBIDDEN, {}, None return client.FORBIDDEN, {}, None
def options(self, environ, read_collections, write_collections, content, user): def options(self, environ, read_collections, write_collections, content,
user):
"""Manage OPTIONS request.""" """Manage OPTIONS request."""
headers = { headers = {
"Allow": ("DELETE, HEAD, GET, MKCALENDAR, MKCOL, MOVE, " "Allow": ("DELETE, HEAD, GET, MKCALENDAR, MKCOL, MOVE, "
@ -471,24 +486,26 @@ class Application(object):
"DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol"} "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol"}
return client.OK, headers, None return client.OK, headers, None
def propfind(self, environ, read_collections, write_collections, content, user): def propfind(self, environ, read_collections, write_collections, content,
user):
"""Manage PROPFIND request.""" """Manage PROPFIND request."""
# Rights is handled by collection in xmlutils.propfind # Rights is handled by collection in xmlutils.propfind
headers = { headers = {
"DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol", "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
"Content-Type": "text/xml"} "Content-Type": "text/xml"}
collections = self._union(read_collections, write_collections) collections = set(read_collections + write_collections)
answer = xmlutils.propfind( answer = xmlutils.propfind(
environ["PATH_INFO"], content, collections, user) environ["PATH_INFO"], content, collections, user)
return client.MULTI_STATUS, headers, answer return client.MULTI_STATUS, headers, answer
def proppatch(self, environ, read_collections, write_collections, content, user): def proppatch(self, environ, read_collections, write_collections, content,
user):
"""Manage PROPPATCH request.""" """Manage PROPPATCH request."""
if not len(write_collections): if not len(write_collections):
return NOT_ALLOWED return NOT_ALLOWED
collection = write_collections[0] collection = write_collections[0]
answer = xmlutils.proppatch( answer = xmlutils.proppatch(
environ["PATH_INFO"], content, collection) environ["PATH_INFO"], content, collection)
headers = { headers = {
@ -500,9 +517,9 @@ class Application(object):
"""Manage PUT request.""" """Manage PUT request."""
if not len(write_collections): if not len(write_collections):
return NOT_ALLOWED return NOT_ALLOWED
collection = write_collections[0] collection = write_collections[0]
collection.set_mimetype(environ.get("CONTENT_TYPE")) collection.set_mimetype(environ.get("CONTENT_TYPE"))
headers = {} headers = {}
item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection) item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
@ -531,15 +548,16 @@ class Application(object):
status = client.PRECONDITION_FAILED status = client.PRECONDITION_FAILED
return status, headers, None return status, headers, None
def report(self, environ, read_collections, write_collections, content, user): def report(self, environ, read_collections, write_collections, content,
user):
"""Manage REPORT request.""" """Manage REPORT request."""
if not len(read_collections): if not len(read_collections):
return NOT_ALLOWED return NOT_ALLOWED
collection = read_collections[0] collection = read_collections[0]
headers = {"Content-Type": "text/xml"} headers = {"Content-Type": "text/xml"}
answer = xmlutils.report(environ["PATH_INFO"], content, collection) answer = xmlutils.report(environ["PATH_INFO"], content, collection)
return client.MULTI_STATUS, headers, answer return client.MULTI_STATUS, headers, answer

View File

@ -68,7 +68,7 @@ INITIAL_CONFIG = {
"courier_socket": ""}, "courier_socket": ""},
"rights": { "rights": {
"type": "None", "type": "None",
"file" : "None"}, "file": ""},
"storage": { "storage": {
"type": "filesystem", "type": "filesystem",
"filesystem_folder": os.path.expanduser( "filesystem_folder": os.path.expanduser(

View File

@ -21,156 +21,60 @@ File-based rights.
The owner is implied to have all rights on their collections. The owner is implied to have all rights on their collections.
Rights are read from a file whose name is specified in the config Rights are read from a file whose name is specified in the config (section
(section "right", key "file"). "right", key "file").
The file's format is per line:
collectionpath ":" principal " " rights {", " principal " " rights}*
collectionpath is the path part of the collection's url
principal is a user name (no whitespace allowed)
rights is a string w/o whitespace that contains "r" for reading rights,
"w" for writing rights and a combination of these for all rights.
Empty lines are ignored. Lines starting with "#" (hash sign) are comments.
Example: Example:
# This means user1 may read, user2 may write, user3 has full access # This means user1 may read, user2 may write, user3 has full access
/user0/calendar : user1 r, user2 w, user3 rw [/user0/calendar]
user1: r
user2: w
user3: rw
# user0 can read /user1/cal # user0 can read /user1/cal
/user1/cal : user0 r [/user1/cal]
user0: r
If a collection /a/b is shared and other users than the owner are # If a collection /a/b is shared and other users than the owner are supposed to
supposed to find the collection in a propfind request, an additional # find the collection in a propfind request, an additional line for /a has to
line for /a has to be in the defintions. E.g.: # be in the defintions. E.g.:
/user0/cal: user [/user0]
user1: r
""" """
from radicale import config, log from radicale import config, log
from radicale.rights import owner_only from radicale.rights import owner_only
# Manage Python2/3 different modules
# pylint: disable=F0401
try:
from configparser import RawConfigParser as ConfigParser
except ImportError:
from ConfigParser import RawConfigParser as ConfigParser
# pylint: enable=F0401
READ_AUTHORIZED = None FILENAME = config.get("rights", "file")
WRITE_AUTHORIZED = None if FILENAME:
log.LOGGER.debug("Reading rights from file %s" % FILENAME)
RIGHTS = ConfigParser()
class ParsingError(BaseException): RIGHTS.read(FILENAME)
"""Raised if the file cannot be parsed""" else:
log.LOGGER.error("No file name configured for rights type 'from_file'")
RIGHTS = None
def read_authorized(user, collection): def read_authorized(user, collection):
"""Check if the user is allowed to read the collection.""" """Check if the user is allowed to read the collection."""
if owner_only.read_authorized(user, collection): return (
return True owner_only.read_authorized(user, collection) or
"r" in RIGHTS.get(collection.url.rstrip("/") or "/", user))
curl = _normalize_trail_slash(collection.url)
return _dict_knows(READ_AUTHORIZED, curl, user)
def write_authorized(user, collection): def write_authorized(user, collection):
"""Check if the user is allowed to write the collection.""" """Check if the user is allowed to write the collection."""
if owner_only.read_authorized(user, collection): return (
return True owner_only.write_authorized(user, collection) or
"w" in RIGHTS.get(collection.url.rstrip("/") or "/", user))
curl = _normalize_trail_slash(collection.url)
return _dict_knows(WRITE_AUTHORIZED, curl, user)
def _dict_knows(adict, url, user):
return adict.has_key(url) and adict.get(url).count(user) != 0
def _load():
read = {}
write = {}
file_name = config.get("rights", "file")
if file_name == "None":
log.LOGGER.error("No file name configured for rights type 'from_file'")
return
log.LOGGER.debug("Reading rights from file %s" % file_name)
lines = open(file_name, "r").readlines()
for line in lines:
_process(line, read, write)
global READ_AUTHORIZED, WRITE_AUTHORIZED
READ_AUTHORIZED = read
WRITE_AUTHORIZED = write
def _process(line, read, write):
line = line.strip()
if line == "":
"""Empty line"""
return
if line.startswith("#"):
"""Comment"""
return
collection, sep, rights_part = line.partition(":")
rights_part = rights_part.strip()
if rights_part == "":
return
collection = collection.strip()
if collection == "":
raise ParsingError
collection = _normalize_trail_slash(collection)
rights = rights_part.split(",")
for right in rights:
user, sep, right_defs = right.strip().partition(" ")
if user == "" or right_defs == "":
raise ParsingError
user = user.strip()
right_defs = right_defs.strip()
for right_def in list(right_defs):
if right_def == 'r':
_append(read, collection, user)
elif right_def == 'w':
_append(write, collection, user)
else:
raise ParsingError
def _append(rdict, key, value):
if rdict.has_key(key):
rlist = rdict[key]
rlist.append(value)
else:
rlist = [value]
rdict[key] = rlist
def _normalize_trail_slash(s):
"""Removes a maybe existing trailing slash"""
if s != "/" and s.endswith("/"):
s, sep, empty = s.rpartition("/")
return s
_load()

View File

@ -188,7 +188,7 @@ def propfind(path, xml_request, collections, user=None):
"""Read and answer PROPFIND requests. """Read and answer PROPFIND requests.
Read rfc4918-9.1 for info. Read rfc4918-9.1 for info.
The collections parameter is a list of collections that are The collections parameter is a list of collections that are
to be included in the output. Rights checking has to be done to be included in the output. Rights checking has to be done
by the caller. by the caller.

View File

@ -36,23 +36,8 @@ For further information, please visit the `Radicale Website
""" """
from distutils.core import setup, Command from distutils.core import setup
import unittest
import radicale import radicale
import sys
class RunTests(Command):
user_options = []
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
tests = unittest.defaultTestLoader.discover("test/python")
result = unittest.TextTestRunner(stream=sys.stdout, verbosity=99)._makeResult()
tests.run(result)
# When the version is updated, ``radicale.VERSION`` must be modified. # When the version is updated, ``radicale.VERSION`` must be modified.
@ -73,7 +58,6 @@ setup(
"radicale", "radicale.auth", "radicale.rights", "radicale.storage"], "radicale", "radicale.auth", "radicale.rights", "radicale.storage"],
provides=["radicale"], provides=["radicale"],
scripts=["bin/radicale"], scripts=["bin/radicale"],
cmdclass={'test': RunTests},
keywords=["calendar", "addressbook", "CalDAV", "CardDAV"], keywords=["calendar", "addressbook", "CalDAV", "CardDAV"],
classifiers=[ classifiers=[
"Development Status :: 4 - Beta", "Development Status :: 4 - Beta",

View File

@ -1,7 +0,0 @@
'''
Created on 09.08.2012
Tests for rights-related code.
@author: mj
'''

View File

@ -1,163 +0,0 @@
"""
Unit test for radicale.rights.from_file.
Tests reading the file. The processing is untested, yet.
"""
from radicale.rights import from_file
import unittest
class Test1(unittest.TestCase):
def testProcessEmptyLine(self):
""" Line with a comment """
# Input values
line = " "
read = {}
write = {}
try:
# Call SUT
from_file._process(line, read, write)
except from_file.ParsingError:
self.assertTrue(False)
self.assertTrue(len(read.keys()) == 0)
self.assertTrue(len(write.keys()) == 0)
def testProcessComment(self):
""" Line with a comment """
# Input values
line = "# some comment"
read = {}
write = {}
try:
# Call SUT
from_file._process(line, read, write)
except from_file.ParsingError:
self.assertTrue(False)
self.assertTrue(len(read.keys()) == 0)
self.assertTrue(len(write.keys()) == 0)
def testProcess0a(self):
""" Pointless line: no rights definitions """
# Input values
line = "/user1/collection1 : "
read = {}
write = {}
try:
# Call SUT
from_file._process(line, read, write)
except from_file.ParsingError:
self.fail("Unexpected exception")
self.assertTrue(len(read.keys()) == 0)
self.assertTrue(len(write.keys()) == 0)
def testProcess1a(self):
""" Malformed line: no collection definitions """
# Input values
line = " : a b"
read = {}
write = {}
try:
# Call SUT
from_file._process(line, read, write)
except from_file.ParsingError:
"""Exception expected"""
else:
self.fail("Expected exception not raised")
def testProcess1b(self):
""" Malformed line: right "b" unknown """
# Input values
line = "/user1/collection1 : a b"
read = {}
write = {}
try:
# Call SUT
from_file._process(line, read, write)
except from_file.ParsingError:
"""Exception expected"""
else:
self.fail("Expected exception not raised")
def testProcess1c(self):
""" Malformed line: user/right empty """
# Input values
line = "/user1/collection1 : a"
read = {}
write = {}
try:
# Call SUT
from_file._process(line, read, write)
except from_file.ParsingError:
"""Exception expected"""
else:
self.fail("Expected exception not raised")
def testProcess2(self):
"""Actual sensible input all of which means the same"""
lines = [
"/user1/collection1 : other1 r, other2 w, other6 rw",
"/user1/collection1/ : other1 r, other2 w, other6 rw",
"/user1/collection1: other1 r, other2 w, other6 rw",
"/user1/collection1/: other1 r, other2 w, other6 rw",
"/user1/collection1: other1 r, other2 w,other6 rw",
"/user1/collection1 :other1 r,other2 w, other6 rw",
"/user1/collection1\t:other1 r,\tother2 w,\tother6 rw",
]
for line in lines:
# Input values
read = {}
write = {}
try:
# Call SUT
from_file._process(line, read, write)
except:
self.fail("unexpected exception for input %s" % line)
# Check
self.assertEquals(len(read.keys()), 1, "keys in %s" % line)
self.assertEquals(len(read.get("/user1/collection1")), 2, "rights in %s" % line)
self.assertTrue(read.get("/user1/collection1").count("other1"), "other1 read in %s" % line)
self.assertTrue(read.get("/user1/collection1").count("other6"), "other6 read in %s" % line)
self.assertEquals(len(write.keys()), 1, "keys in %s" % line)
self.assertEquals(len(write.get("/user1/collection1")), 2, "rights in %s" % line)
self.assertTrue(write.get("/user1/collection1").count("other2"), "other2 write in %s" % line)
self.assertTrue(write.get("/user1/collection1").count("other6"), "other6 write in %s" % line)
if __name__ == "__main__":
unittest.main()