Merge pull request #343 from Unrud/paths

Secure path handling
This commit is contained in:
Guillaume Ayoub 2015-12-24 15:48:14 +01:00
commit 18c88642fb
5 changed files with 161 additions and 50 deletions

View File

@ -32,7 +32,6 @@ import os
import sys import sys
import pprint import pprint
import base64 import base64
import posixpath
import socket import socket
import ssl import ssl
import wsgiref.simple_server import wsgiref.simple_server
@ -48,7 +47,7 @@ except ImportError:
from urlparse import urlparse from urlparse import urlparse
# pylint: enable=F0401,E0611 # pylint: enable=F0401,E0611
from . import auth, config, ical, log, rights, storage, xmlutils from . import auth, config, ical, log, pathutils, rights, storage, xmlutils
VERSION = "1.0.1" VERSION = "1.0.1"
@ -177,12 +176,9 @@ class Application(object):
@staticmethod @staticmethod
def sanitize_uri(uri): def sanitize_uri(uri):
"""Unquote and remove /../ to prevent access to other data.""" """Unquote and make absolute to prevent access to other data."""
uri = unquote(uri) uri = unquote(uri)
trailing_slash = "/" if uri.endswith("/") else "" return pathutils.sanitize_path(uri)
uri = posixpath.normpath(uri)
trailing_slash = "" if uri == "/" else trailing_slash
return uri + trailing_slash
def collect_allowed_items(self, items, user): def collect_allowed_items(self, items, user):
"""Get items from request that user is allowed to access.""" """Get items from request that user is allowed to access."""
@ -249,20 +245,24 @@ class Application(object):
headers = pprint.pformat(self.headers_log(environ)) headers = pprint.pformat(self.headers_log(environ))
log.LOGGER.debug("Request headers:\n%s" % headers) log.LOGGER.debug("Request headers:\n%s" % headers)
# Strip base_prefix from request URI
base_prefix = config.get("server", "base_prefix") base_prefix = config.get("server", "base_prefix")
if environ["PATH_INFO"].startswith(base_prefix): if environ["PATH_INFO"].startswith(base_prefix):
# Sanitize request URI environ["PATH_INFO"] = environ["PATH_INFO"][len(base_prefix):]
environ["PATH_INFO"] = self.sanitize_uri(
"/%s" % environ["PATH_INFO"][len(base_prefix):])
log.LOGGER.debug("Sanitized path: %s", environ["PATH_INFO"])
elif config.get("server", "can_skip_base_prefix"): elif config.get("server", "can_skip_base_prefix"):
log.LOGGER.debug( log.LOGGER.debug(
"Skipped already sanitized path: %s", environ["PATH_INFO"]) "Prefix already stripped from path: %s", environ["PATH_INFO"])
else: else:
# Request path not starting with base_prefix, not allowed # Request path not starting with base_prefix, not allowed
log.LOGGER.debug( log.LOGGER.debug(
"Path not starting with prefix: %s", environ["PATH_INFO"]) "Path not starting with prefix: %s", environ["PATH_INFO"])
environ["PATH_INFO"] = None status, headers, _ = NOT_ALLOWED
start_response(status, list(headers.items()))
return []
# Sanitize request URI
environ["PATH_INFO"] = self.sanitize_uri(environ["PATH_INFO"])
log.LOGGER.debug("Sanitized path: %s", environ["PATH_INFO"])
path = environ["PATH_INFO"] path = environ["PATH_INFO"]

View File

@ -26,13 +26,14 @@ Define the main classes of a collection as seen from the server.
""" """
import os import os
import posixpath
import hashlib import hashlib
import re import re
from uuid import uuid4 from uuid import uuid4
from random import randint from random import randint
from contextlib import contextmanager from contextlib import contextmanager
from . import pathutils
def serialize(tag, headers=(), items=()): def serialize(tag, headers=(), items=()):
"""Return a text corresponding to given collection ``tag``. """Return a text corresponding to given collection ``tag``.
@ -183,8 +184,9 @@ class Collection(object):
""" """
self.encoding = "utf-8" self.encoding = "utf-8"
split_path = path.split("/") # path should already be sanitized
self.path = path if path != "." else "" self.path = pathutils.sanitize_path(path).strip("/")
split_path = self.path.split("/")
if principal and split_path and self.is_node(self.path): if principal and split_path and self.is_node(self.path):
# Already existing principal collection # Already existing principal collection
self.owner = split_path[0] self.owner = split_path[0]
@ -215,8 +217,8 @@ class Collection(object):
if path is None: if path is None:
return [] return []
# First do normpath and then strip, to prevent access to FOLDER/../ # path should already be sanitized
sane_path = posixpath.normpath(path.replace(os.sep, "/")).strip("/") sane_path = pathutils.sanitize_path(path).strip("/")
attributes = sane_path.split("/") attributes = sane_path.split("/")
if not attributes: if not attributes:
return [] return []

84
radicale/pathutils.py Normal file
View File

@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
#
# This file is part of Radicale Server - Calendar Server
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
"""
Helper functions for working with paths
"""
import os
import posixpath
from . import log
def sanitize_path(path):
"""Make absolute (with leading slash) to prevent access to other data.
Preserves an potential trailing slash."""
trailing_slash = "/" if path.endswith("/") else ""
path = posixpath.normpath(path)
new_path = "/"
for part in path.split("/"):
if not part or part in (".", ".."):
continue
new_path = posixpath.join(new_path, part)
trailing_slash = "" if new_path.endswith("/") else trailing_slash
return new_path + trailing_slash
def is_safe_path_component(path):
"""Checks if path is a single component of a path and is safe to join"""
if not path:
return False
head, _ = posixpath.split(path)
if head:
return False
if path in (".", ".."):
return False
return True
def is_safe_filesystem_path_component(path):
"""Checks if path is a single component of a local filesystem path
and is safe to join"""
if not path:
return False
drive, _ = os.path.splitdrive(path)
if drive:
return False
head, _ = os.path.split(path)
if head:
return False
if path in (os.curdir, os.pardir):
return False
return True
def path_to_filesystem(path, base_folder):
"""Converts path to a local filesystem path relative to base_folder
in a secure manner or raises ValueError."""
sane_path = sanitize_path(path).strip("/")
safe_path = base_folder
if not sane_path:
return safe_path
for part in sane_path.split("/"):
if not is_safe_filesystem_path_component(part):
log.LOGGER.debug("Can't translate path safely to filesystem: %s",
path)
raise ValueError("Unsafe path")
safe_path = os.path.join(safe_path, part)
return safe_path

View File

@ -28,7 +28,8 @@ import json
import time import time
import sys import sys
from contextlib import contextmanager from contextlib import contextmanager
from .. import config, ical
from .. import config, ical, log, pathutils
FOLDER = os.path.expanduser(config.get("storage", "filesystem_folder")) FOLDER = os.path.expanduser(config.get("storage", "filesystem_folder"))
@ -63,59 +64,67 @@ def open(path, mode="r"):
class Collection(ical.Collection): class Collection(ical.Collection):
"""Collection stored in a flat ical file.""" """Collection stored in a flat ical file."""
@property @property
def _path(self): def _filesystem_path(self):
"""Absolute path of the file at local ``path``.""" """Absolute path of the file at local ``path``."""
return os.path.join(FOLDER, self.path.replace("/", os.sep)) return pathutils.path_to_filesystem(self.path, FOLDER)
@property @property
def _props_path(self): def _props_path(self):
"""Absolute path of the file storing the collection properties.""" """Absolute path of the file storing the collection properties."""
return self._path + ".props" return self._filesystem_path + ".props"
def _create_dirs(self): def _create_dirs(self):
"""Create folder storing the collection if absent.""" """Create folder storing the collection if absent."""
if not os.path.exists(os.path.dirname(self._path)): if not os.path.exists(os.path.dirname(self._filesystem_path)):
os.makedirs(os.path.dirname(self._path)) os.makedirs(os.path.dirname(self._filesystem_path))
def save(self, text): def save(self, text):
self._create_dirs() self._create_dirs()
with open(self._path, "w") as fd: with open(self._filesystem_path, "w") as fd:
fd.write(text) fd.write(text)
def delete(self): def delete(self):
os.remove(self._path) os.remove(self._filesystem_path)
os.remove(self._props_path) os.remove(self._props_path)
@property @property
def text(self): def text(self):
try: try:
with open(self._path) as fd: with open(self._filesystem_path) as fd:
return fd.read() return fd.read()
except IOError: except IOError:
return "" return ""
@classmethod @classmethod
def children(cls, path): def children(cls, path):
abs_path = os.path.join(FOLDER, path.replace("/", os.sep)) filesystem_path = pathutils.path_to_filesystem(path, FOLDER)
_, directories, files = next(os.walk(abs_path)) _, directories, files = next(os.walk(filesystem_path))
for filename in directories + files: for filename in directories + files:
# make sure that the local filename can be translated
# into an internal path
if not pathutils.is_safe_path_component(filename):
log.LOGGER.debug("Skipping unsupported filename: %s",
filename)
continue
rel_filename = posixpath.join(path, filename) rel_filename = posixpath.join(path, filename)
if cls.is_node(rel_filename) or cls.is_leaf(rel_filename): if cls.is_node(rel_filename) or cls.is_leaf(rel_filename):
yield cls(rel_filename) yield cls(rel_filename)
@classmethod @classmethod
def is_node(cls, path): def is_node(cls, path):
abs_path = os.path.join(FOLDER, path.replace("/", os.sep)) filesystem_path = pathutils.path_to_filesystem(path, FOLDER)
return os.path.isdir(abs_path) return os.path.isdir(filesystem_path)
@classmethod @classmethod
def is_leaf(cls, path): def is_leaf(cls, path):
abs_path = os.path.join(FOLDER, path.replace("/", os.sep)) filesystem_path = pathutils.path_to_filesystem(path, FOLDER)
return os.path.isfile(abs_path) and not abs_path.endswith(".props") return (os.path.isfile(filesystem_path) and not
filesystem_path.endswith(".props"))
@property @property
def last_modified(self): def last_modified(self):
modification_time = time.gmtime(os.path.getmtime(self._path)) modification_time = \
time.gmtime(os.path.getmtime(self._filesystem_path))
return time.strftime("%a, %d %b %Y %H:%M:%S +0000", modification_time) return time.strftime("%a, %d %b %Y %H:%M:%S +0000", modification_time)
@property @property

View File

@ -30,13 +30,14 @@ import sys
from . import filesystem from . import filesystem
from .. import ical from .. import ical
from .. import log from .. import log
from .. import pathutils
class Collection(filesystem.Collection): class Collection(filesystem.Collection):
"""Collection stored in several files per calendar.""" """Collection stored in several files per calendar."""
def _create_dirs(self): def _create_dirs(self):
if not os.path.exists(self._path): if not os.path.exists(self._filesystem_path):
os.makedirs(self._path) os.makedirs(self._filesystem_path)
@property @property
def headers(self): def headers(self):
@ -52,17 +53,28 @@ class Collection(filesystem.Collection):
name = ( name = (
component.name if sys.version_info[0] >= 3 else component.name if sys.version_info[0] >= 3 else
component.name.encode(filesystem.FILESYSTEM_ENCODING)) component.name.encode(filesystem.FILESYSTEM_ENCODING))
path = os.path.join(self._path, name) if not pathutils.is_safe_filesystem_path_component(name):
with filesystem.open(path, "w") as fd: log.LOGGER.debug(
"Can't tranlate name safely to filesystem, "
"skipping component: %s", name)
continue
filesystem_path = os.path.join(self._filesystem_path, name)
with filesystem.open(filesystem_path, "w") as fd:
fd.write(text) fd.write(text)
def delete(self): def delete(self):
shutil.rmtree(self._path) shutil.rmtree(self._filesystem_path)
os.remove(self._props_path) os.remove(self._props_path)
def remove(self, name): def remove(self, name):
if os.path.exists(os.path.join(self._path, name)): if not pathutils.is_safe_filesystem_path_component(name):
os.remove(os.path.join(self._path, name)) log.LOGGER.debug(
"Can't tranlate name safely to filesystem, "
"skipping component: %s", name)
return
filesystem_path = os.path.join(self._filesystem_path, name)
if os.path.exists(filesystem_path):
os.remove(filesystem_path)
@property @property
def text(self): def text(self):
@ -70,14 +82,14 @@ class Collection(filesystem.Collection):
ical.Timezone, ical.Event, ical.Todo, ical.Journal, ical.Card) ical.Timezone, ical.Event, ical.Todo, ical.Journal, ical.Card)
items = set() items = set()
try: try:
filenames = os.listdir(self._path) filenames = os.listdir(self._filesystem_path)
except (OSError, IOError) as e: except (OSError, IOError) as e:
log.LOGGER.info('Error while reading collection %r: %r' log.LOGGER.info('Error while reading collection %r: %r'
% (self._path, e)) % (self._filesystem_path, e))
return "" return ""
for filename in filenames: for filename in filenames:
path = os.path.join(self._path, filename) path = os.path.join(self._filesystem_path, filename)
try: try:
with filesystem.open(path) as fd: with filesystem.open(path) as fd:
items.update(self._parse(fd.read(), components)) items.update(self._parse(fd.read(), components))
@ -90,17 +102,21 @@ class Collection(filesystem.Collection):
@classmethod @classmethod
def is_node(cls, path): def is_node(cls, path):
path = os.path.join(filesystem.FOLDER, path.replace("/", os.sep)) filesystem_path = pathutils.path_to_filesystem(path,
return os.path.isdir(path) and not os.path.exists(path + ".props") filesystem.FOLDER)
return (os.path.isdir(filesystem_path) and
not os.path.exists(filesystem_path + ".props"))
@classmethod @classmethod
def is_leaf(cls, path): def is_leaf(cls, path):
path = os.path.join(filesystem.FOLDER, path.replace("/", os.sep)) filesystem_path = pathutils.path_to_filesystem(path,
return os.path.isdir(path) and os.path.exists(path + ".props") filesystem.FOLDER)
return (os.path.isdir(filesystem_path) and
os.path.exists(path + ".props"))
@property @property
def last_modified(self): def last_modified(self):
last = max([ last = max([
os.path.getmtime(os.path.join(self._path, filename)) os.path.getmtime(os.path.join(self._filesystem_path, filename))
for filename in os.listdir(self._path)] or [0]) for filename in os.listdir(self._filesystem_path)] or [0])
return time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime(last)) return time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime(last))