preliminary iCal/iPhone support introduced

This commit is contained in:
Lukasz Langa 2011-06-01 12:43:49 +02:00
parent 911cd48efe
commit e05e94a129
3 changed files with 201 additions and 113 deletions

View File

@ -29,7 +29,6 @@ should have been included in this package.
"""
import os
import posixpath
import pprint
import base64
import socket
@ -151,28 +150,19 @@ class Application(object):
else:
content = None
# Find calendar
attributes = posixpath.normpath(
environ["PATH_INFO"].strip("/")).split("/")
if attributes:
if attributes[-1].endswith(".ics"):
attributes.pop()
path = "/".join(attributes[:min(len(attributes), 2)])
calendar = ical.Calendar(path)
else:
calendar = None
# Find calendar(s)
items = ical.Calendar.from_path(environ["PATH_INFO"],
environ.get("HTTP_DEPTH", "0"))
# Get function corresponding to method
function = getattr(self, environ["REQUEST_METHOD"].lower())
# Check rights
if not calendar or not self.acl:
if not items or not self.acl:
# No calendar or no acl, don't check rights
status, headers, answer = function(environ, calendar, content)
status, headers, answer = function(environ, items, content)
else:
# Ask authentication backend to check rights
log.LOGGER.info(
"Checking rights for calendar owned by %s" % calendar.owner)
authorization = environ.get("HTTP_AUTHORIZATION", None)
if authorization:
@ -182,11 +172,27 @@ class Application(object):
else:
user = password = None
if self.acl.has_right(calendar.owner, user, password):
log.LOGGER.info("%s allowed" % (user or "anonymous user"))
status, headers, answer = function(environ, calendar, content)
last_allowed = False
calendars = []
for calendar in items:
if not isinstance(calendar, ical.Calendar):
if last_allowed:
calendars.append(calendar)
continue
log.LOGGER.info(
"Checking rights for calendar owned by %s" % calendar.owner)
if self.acl.has_right(calendar.owner, user, password):
log.LOGGER.info("%s allowed" % (user or "anonymous user"))
calendars.append(calendar)
last_allowed = True
else:
log.LOGGER.info("%s refused" % (user or "anonymous user"))
last_allowed = False
if calendars:
status, headers, answer = function(environ, calendars, content)
else:
log.LOGGER.info("%s refused" % (user or "anonymous user"))
status = client.UNAUTHORIZED
headers = {
"WWW-Authenticate":
@ -209,8 +215,9 @@ class Application(object):
# All these functions must have the same parameters, some are useless
# pylint: disable=W0612,W0613,R0201
def get(self, environ, calendar, content):
def get(self, environ, calendars, content):
"""Manage GET request."""
calendar = calendars[0]
item_name = xmlutils.name_from_path(environ["PATH_INFO"], calendar)
if item_name:
# Get calendar item
@ -235,13 +242,14 @@ class Application(object):
answer = answer_text.encode(self.encoding)
return client.OK, headers, answer
def head(self, environ, calendar, content):
def head(self, environ, calendars, content):
"""Manage HEAD request."""
status, headers, answer = self.get(environ, calendar, content)
status, headers, answer = self.get(environ, calendars, content)
return status, headers, None
def delete(self, environ, calendar, content):
def delete(self, environ, calendars, content):
"""Manage DELETE request."""
calendar = calendars[0]
item = calendar.get_item(
xmlutils.name_from_path(environ["PATH_INFO"], calendar))
if item and environ.get("HTTP_IF_MATCH", item.etag) == item.etag:
@ -254,8 +262,9 @@ class Application(object):
status = client.PRECONDITION_FAILED
return status, {}, answer
def mkcalendar(self, environ, calendar, content):
def mkcalendar(self, environ, calendars, content):
"""Manage MKCALENDAR request."""
calendar = calendars[0]
props = xmlutils.props_from_request(content)
tz = props.get('C:calendar-timezone')
if tz:
@ -267,7 +276,7 @@ class Application(object):
calendar.write()
return client.CREATED, {}, None
def options(self, environ, calendar, content):
def options(self, environ, calendars, content):
"""Manage OPTIONS request."""
headers = {
"Allow": "DELETE, HEAD, GET, MKCALENDAR, " \
@ -275,26 +284,27 @@ class Application(object):
"DAV": "1, calendar-access"}
return client.OK, headers, None
def propfind(self, environ, calendar, content):
def propfind(self, environ, calendars, content):
"""Manage PROPFIND request."""
headers = {
"DAV": "1, calendar-access",
"Content-Type": "text/xml"}
answer = xmlutils.propfind(
environ["PATH_INFO"], content, calendar,
environ.get("HTTP_DEPTH", "infinity"))
environ["PATH_INFO"], content, calendars)
return client.MULTI_STATUS, headers, answer
def proppatch(self, environ, calendar, content):
def proppatch(self, environ, calendars, content):
"""Manage PROPPATCH request."""
calendar = calendars[0]
answer = xmlutils.proppatch(environ["PATH_INFO"], content, calendar)
headers = {
"DAV": "1, calendar-access",
"Content-Type": "text/xml"}
return client.MULTI_STATUS, headers, answer
def put(self, environ, calendar, content):
def put(self, environ, calendars, content):
"""Manage PUT request."""
calendar = calendars[0]
headers = {}
item_name = xmlutils.name_from_path(environ["PATH_INFO"], calendar)
item = calendar.get_item(item_name)
@ -312,8 +322,10 @@ class Application(object):
status = client.PRECONDITION_FAILED
return status, headers, None
def report(self, environ, calendar, content):
def report(self, environ, calendars, content):
"""Manage REPORT request."""
# TODO: support multiple calendars here
calendar = calendars[0]
headers = {'Content-Type': 'text/xml'}
answer = xmlutils.report(environ["PATH_INFO"], content, calendar)
return client.MULTI_STATUS, headers, answer

View File

@ -29,6 +29,7 @@ import codecs
from contextlib import contextmanager
import json
import os
import posixpath
import time
from radicale import config
@ -155,13 +156,60 @@ class Calendar(object):
"""Internal calendar class."""
tag = "VCALENDAR"
def __init__(self, path):
def __init__(self, path, principal=False):
"""Initialize the calendar with ``cal`` and ``user`` parameters."""
self.encoding = "utf-8"
split_path = path.split("/")
self.owner = split_path[0] if len(split_path) > 1 else None
self.path = os.path.join(FOLDER, path.replace("/", os.path.sep))
self.path = os.path.join(FOLDER, path.replace("/", os.sep))
self.local_path = path
self.is_principal = principal
@classmethod
def from_path(cls, path, depth="infinite", include_container=True):
"""Return a list of calendars and/or sub-items under the given ``path``
relative to the storage folder. If ``depth`` is "0", only the actual
object under `path` is returned. Otherwise, also sub-items are appended
to the result. If `include_container` is True (the default), the
containing object is included in the result.
"""
attributes = posixpath.normpath(path.strip("/")).split("/")
if not attributes:
return None
if attributes[-1].endswith(".ics"):
attributes.pop()
result = []
path = "/".join(attributes[:min(len(attributes), 2)])
path = path.replace("/", os.sep)
abs_path = os.path.join(FOLDER, path)
if os.path.isdir(abs_path):
if depth == "0":
result.append(cls(path, principal=True))
else:
if include_container:
result.append(cls(path, principal=True))
for f in os.walk(abs_path).next()[2]:
f_path = os.path.join(path, f)
if cls.is_vcalendar(os.path.join(abs_path, f)):
result.append(cls(f_path))
else:
calendar = cls(path)
if depth == "0":
result.append(calendar)
else:
if include_container:
result.append(calendar)
result.extend(calendar.components)
return result
@staticmethod
def is_vcalendar(path):
"""Return `True` if there is a VCALENDAR file under `path`."""
with open(path) as f:
return 'BEGIN:VCALENDAR' == f.read(15)
@staticmethod
def _parse(text, item_types, name=None):
@ -340,3 +388,7 @@ class Calendar(object):
# on exit
with open(props_path, 'w') as prop_file:
json.dump(properties, prop_file)
@property
def url(self):
return '/{}/'.format(self.local_path).replace('//', '/')

View File

@ -42,7 +42,8 @@ NAMESPACES = {
"C": "urn:ietf:params:xml:ns:caldav",
"D": "DAV:",
"CS": "http://calendarserver.org/ns/",
"ICAL": "http://apple.com/ns/ical/"}
"ICAL": "http://apple.com/ns/ical/",
"ME": "http://me.com/_namespace/"}
NAMESPACES_REV = {}
@ -104,10 +105,8 @@ def _tag_from_clark(name):
args = {
'ns': NAMESPACES_REV[match.group('namespace')],
'tag': match.group('tag')}
tag_name = '%(ns)s:%(tag)s' % args
else:
tag_name = prop.tag
return tag_name
return '%(ns)s:%(tag)s' % args
return name
def _response(code):
@ -168,7 +167,7 @@ def delete(path, calendar):
return _pretty_xml(multistatus)
def propfind(path, xml_request, calendar, depth):
def propfind(path, xml_request, calendars):
"""Read and answer PROPFIND requests.
Read rfc4918-9.1 for info.
@ -183,90 +182,115 @@ def propfind(path, xml_request, calendar, depth):
# Writing answer
multistatus = ET.Element(_tag("D", "multistatus"))
if calendar:
if depth == "0":
items = [calendar]
else:
# Depth is 1, infinity or not specified
# We limit ourselves to depth == 1
items = [calendar] + calendar.components
else:
items = []
for item in items:
is_calendar = isinstance(item, ical.Calendar)
response = ET.Element(_tag("D", "response"))
for calendar in calendars:
response = _propfind_response(path, calendar, props)
multistatus.append(response)
href = ET.Element(_tag("D", "href"))
href.text = path if is_calendar else path + item.name
response.append(href)
return _pretty_xml(multistatus)
propstat = ET.Element(_tag("D", "propstat"))
response.append(propstat)
prop = ET.Element(_tag("D", "prop"))
propstat.append(prop)
def _propfind_response(path, item, props):
is_calendar = isinstance(item, ical.Calendar)
if is_calendar:
with item.props as cal_props:
calendar_props = cal_props
for tag in props:
element = ET.Element(tag)
if tag == _tag("D", "resourcetype") and is_calendar:
tag = ET.Element(_tag("C", "calendar"))
element.append(tag)
response = ET.Element(_tag("D", "response"))
href = ET.Element(_tag("D", "href"))
href.text = item.url if is_calendar else path + item.name
response.append(href)
propstat404 = ET.Element(_tag("D", "propstat"))
propstat200 = ET.Element(_tag("D", "propstat"))
response.append(propstat200)
prop200 = ET.Element(_tag("D", "prop"))
propstat200.append(prop200)
prop404 = ET.Element(_tag("D", "prop"))
propstat404.append(prop404)
for tag in props:
element = ET.Element(tag)
is404 = False
if tag == _tag("D", "owner"):
if item.owner:
element.text = item.owner
elif tag == _tag("D", "getcontenttype"):
element.text = "text/calendar"
elif tag == _tag("D", "getetag"):
element.text = item.etag
elif tag == _tag("D", "principal-URL"):
# TODO: use a real principal URL, read rfc3744-4.2 for info
tag = ET.Element(_tag("D", "href"))
if item.owner:
tag.text = "/{}/".format(item.owner).replace("//", "/")
else:
tag.text = path
element.append(tag)
elif tag in (
_tag("D", "principal-collection-set"),
_tag("C", "calendar-user-address-set"),
_tag("C", "calendar-home-set")):
tag = ET.Element(_tag("D", "href"))
tag.text = path
element.append(tag)
elif tag == _tag("C", "supported-calendar-component-set"):
# This is not a Todo
# pylint: disable=W0511
for component in ("VTODO", "VEVENT", "VJOURNAL"):
comp = ET.Element(_tag("C", "comp"))
comp.set("name", component)
element.append(comp)
# pylint: enable=W0511
elif tag == _tag("D", "current-user-privilege-set"):
privilege = ET.Element(_tag("D", "privilege"))
privilege.append(ET.Element(_tag("D", "all")))
element.append(privilege)
elif tag == _tag("D", "supported-report-set"):
for report_name in (
"principal-property-search", "sync-collection"
"expand-property", "principal-search-property-set"):
supported = ET.Element(_tag("D", "supported-report"))
report_tag = ET.Element(_tag("D", "report"))
report_tag.text = report_name
supported.append(report_tag)
element.append(supported)
elif is_calendar:
if tag == _tag("D", "resourcetype"):
if is_calendar and not item.is_principal:
tag = ET.Element(_tag("C", "calendar"))
element.append(tag)
tag = ET.Element(_tag("D", "collection"))
element.append(tag)
elif tag == _tag("D", "owner"):
if calendar.owner:
element.text = calendar.owner
elif tag == _tag("D", "getcontenttype"):
element.text = "text/calendar"
elif tag == _tag("CS", "getctag") and is_calendar:
elif tag == _tag("CS", "getctag"):
element.text = item.etag
elif tag == _tag("D", "getetag"):
element.text = item.etag
elif tag == _tag("D", "displayname") and is_calendar:
element.text = calendar.name
elif tag == _tag("D", "principal-URL"):
# TODO: use a real principal URL, read rfc3744-4.2 for info
tag = ET.Element(_tag("D", "href"))
tag.text = path
element.append(tag)
elif tag in (
_tag("D", "principal-collection-set"),
_tag("C", "calendar-user-address-set"),
_tag("C", "calendar-home-set")):
tag = ET.Element(_tag("D", "href"))
tag.text = path
element.append(tag)
elif tag == _tag("C", "supported-calendar-component-set"):
# This is not a Todo
# pylint: disable=W0511
for component in ("VTODO", "VEVENT", "VJOURNAL"):
comp = ET.Element(_tag("C", "comp"))
comp.set("name", component)
element.append(comp)
# pylint: enable=W0511
elif tag == _tag("D", "current-user-privilege-set"):
privilege = ET.Element(_tag("D", "privilege"))
privilege.append(ET.Element(_tag("D", "all")))
element.append(privilege)
elif tag == _tag("D", "supported-report-set"):
for report_name in (
"principal-property-search", "sync-collection"
"expand-property", "principal-search-property-set"):
supported = ET.Element(_tag("D", "supported-report"))
report_tag = ET.Element(_tag("D", "report"))
report_tag.text = report_name
supported.append(report_tag)
element.append(supported)
prop.append(element)
else:
human_tag = _tag_from_clark(tag)
if human_tag in calendar_props:
element.text = calendar_props[human_tag]
else:
is404 = True
else:
is404 = True
status = ET.Element(_tag("D", "status"))
status.text = _response(200)
propstat.append(status)
if is404:
prop404.append(element)
else:
prop200.append(element)
return _pretty_xml(multistatus)
status200 = ET.Element(_tag("D", "status"))
status200.text = _response(200)
propstat200.append(status200)
status404 = ET.Element(_tag("D", "status"))
status404.text = _response(404)
propstat404.append(status404)
if len(prop404):
response.append(propstat404)
return response
def _add_propstat_to(element, tag, status_number):