Cosmetic changes (pylint)
This commit is contained in:
parent
7aca052859
commit
0fb02cd026
@ -125,9 +125,9 @@ def run():
|
||||
if args.verify_storage:
|
||||
logger.info("Verifying storage")
|
||||
try:
|
||||
Collection = storage.load(configuration)
|
||||
with Collection.acquire_lock("r"):
|
||||
if not Collection.verify():
|
||||
storage_ = storage.load(configuration)
|
||||
with storage_.acquire_lock("r"):
|
||||
if not storage_.verify():
|
||||
logger.fatal("Storage verifcation failed")
|
||||
exit(1)
|
||||
except Exception as e:
|
||||
@ -140,7 +140,7 @@ def run():
|
||||
shutdown_socket, shutdown_socket_out = socket.socketpair()
|
||||
|
||||
# SIGTERM and SIGINT (aka KeyboardInterrupt) shutdown the server
|
||||
def shutdown(*args):
|
||||
def shutdown(signal_number, stack_frame):
|
||||
shutdown_socket.sendall(b" ")
|
||||
signal.signal(signal.SIGTERM, shutdown)
|
||||
signal.signal(signal.SIGINT, shutdown)
|
||||
|
@ -138,7 +138,7 @@ class Application(
|
||||
status, headers, answer = httputils.INTERNAL_SERVER_ERROR
|
||||
answer = answer.encode("ascii")
|
||||
status = "%d %s" % (
|
||||
status, client.responses.get(status, "Unknown"))
|
||||
status.value, client.responses.get(status, "Unknown"))
|
||||
headers = [
|
||||
("Content-Length", str(len(answer)))] + list(headers)
|
||||
answers = [answer]
|
||||
|
@ -21,6 +21,5 @@
|
||||
class ApplicationHeadMixin:
|
||||
def do_HEAD(self, environ, base_prefix, path, user):
|
||||
"""Manage HEAD request."""
|
||||
status, headers, answer = self.do_GET(
|
||||
environ, base_prefix, path, user)
|
||||
status, headers, _ = self.do_GET(environ, base_prefix, path, user)
|
||||
return status, headers, None
|
||||
|
@ -172,7 +172,7 @@ def xml_propfind_response(base_prefix, path, item, props, user, encoding,
|
||||
xmlutils.make_tag("D", "principal-URL"),
|
||||
xmlutils.make_tag("CR", "addressbook-home-set"),
|
||||
xmlutils.make_tag("C", "calendar-home-set")) and
|
||||
collection.is_principal and is_collection):
|
||||
collection.is_principal and is_collection):
|
||||
tag = ET.Element(xmlutils.make_tag("D", "href"))
|
||||
tag.text = xmlutils.make_href(base_prefix, path)
|
||||
element.append(tag)
|
||||
@ -318,7 +318,7 @@ def xml_propfind_response(base_prefix, path, item, props, user, encoding,
|
||||
status404 = ET.Element(xmlutils.make_tag("D", "status"))
|
||||
status404.text = xmlutils.make_response(404)
|
||||
propstat404.append(status404)
|
||||
if len(prop404):
|
||||
if len(prop404) > 0:
|
||||
response.append(propstat404)
|
||||
|
||||
return response
|
||||
|
@ -30,36 +30,31 @@ from radicale import item as radicale_item
|
||||
from radicale import pathutils, storage, xmlutils
|
||||
from radicale.log import logger
|
||||
|
||||
MIMETYPE_TAGS = {value: key for key, value in xmlutils.MIMETYPES.items()}
|
||||
|
||||
|
||||
def prepare(vobject_items, path, content_type, permissions, parent_permissions,
|
||||
tag=None, write_whole_collection=None):
|
||||
if (write_whole_collection or
|
||||
permissions and not parent_permissions):
|
||||
if (write_whole_collection or permissions and not parent_permissions):
|
||||
write_whole_collection = True
|
||||
tags = {value: key
|
||||
for key, value in xmlutils.MIMETYPES.items()}
|
||||
tag = radicale_item.predict_tag_of_whole_collection(
|
||||
vobject_items, tags.get(content_type))
|
||||
vobject_items, MIMETYPE_TAGS.get(content_type))
|
||||
if not tag:
|
||||
raise ValueError("Can't determine collection tag")
|
||||
collection_path = pathutils.strip_path(path)
|
||||
elif (write_whole_collection is not None and
|
||||
not write_whole_collection or
|
||||
not permissions and parent_permissions):
|
||||
elif (write_whole_collection is not None and not write_whole_collection or
|
||||
not permissions and parent_permissions):
|
||||
write_whole_collection = False
|
||||
if tag is None:
|
||||
tag = radicale_item.predict_tag_of_parent_collection(
|
||||
vobject_items)
|
||||
collection_path = posixpath.dirname(
|
||||
pathutils.strip_path(path))
|
||||
tag = radicale_item.predict_tag_of_parent_collection(vobject_items)
|
||||
collection_path = posixpath.dirname(pathutils.strip_path(path))
|
||||
props = None
|
||||
stored_exc_info = None
|
||||
items = []
|
||||
try:
|
||||
if tag:
|
||||
radicale_item.check_and_sanitize_items(
|
||||
vobject_items, is_collection=write_whole_collection,
|
||||
tag=tag)
|
||||
vobject_items, is_collection=write_whole_collection, tag=tag)
|
||||
if write_whole_collection and tag == "VCALENDAR":
|
||||
vobject_components = []
|
||||
vobject_item, = vobject_items
|
||||
@ -67,30 +62,26 @@ def prepare(vobject_items, path, content_type, permissions, parent_permissions,
|
||||
vobject_components.extend(
|
||||
getattr(vobject_item, "%s_list" % content, []))
|
||||
vobject_components_by_uid = itertools.groupby(
|
||||
sorted(vobject_components,
|
||||
key=radicale_item.get_uid),
|
||||
sorted(vobject_components, key=radicale_item.get_uid),
|
||||
radicale_item.get_uid)
|
||||
for _, components in vobject_components_by_uid:
|
||||
vobject_collection = vobject.iCalendar()
|
||||
for component in components:
|
||||
vobject_collection.add(component)
|
||||
item = radicale_item.Item(
|
||||
collection_path=collection_path,
|
||||
vobject_item=vobject_collection)
|
||||
item = radicale_item.Item(collection_path=collection_path,
|
||||
vobject_item=vobject_collection)
|
||||
item.prepare()
|
||||
items.append(item)
|
||||
elif write_whole_collection and tag == "VADDRESSBOOK":
|
||||
for vobject_item in vobject_items:
|
||||
item = radicale_item.Item(
|
||||
collection_path=collection_path,
|
||||
vobject_item=vobject_item)
|
||||
item = radicale_item.Item(collection_path=collection_path,
|
||||
vobject_item=vobject_item)
|
||||
item.prepare()
|
||||
items.append(item)
|
||||
elif not write_whole_collection:
|
||||
vobject_item, = vobject_items
|
||||
item = radicale_item.Item(
|
||||
collection_path=collection_path,
|
||||
vobject_item=vobject_item)
|
||||
item = radicale_item.Item(collection_path=collection_path,
|
||||
vobject_item=vobject_item)
|
||||
item.prepare()
|
||||
items.append(item)
|
||||
|
||||
@ -116,7 +107,6 @@ def prepare(vobject_items, path, content_type, permissions, parent_permissions,
|
||||
def items_generator():
|
||||
while items:
|
||||
yield items.pop(0)
|
||||
|
||||
return (items_generator(), tag, write_whole_collection, props,
|
||||
stored_exc_info)
|
||||
|
||||
@ -190,8 +180,8 @@ class ApplicationPutMixin:
|
||||
prepared_write_whole_collection != write_whole_collection):
|
||||
(prepared_items, prepared_tag, prepared_write_whole_collection,
|
||||
prepared_props, prepared_exc_info) = prepare(
|
||||
vobject_items, path, content_type, permissions,
|
||||
parent_permissions, tag, write_whole_collection)
|
||||
vobject_items, path, content_type, permissions,
|
||||
parent_permissions, tag, write_whole_collection)
|
||||
props = prepared_props
|
||||
if prepared_exc_info:
|
||||
logger.warning(
|
||||
|
@ -180,8 +180,6 @@ def xml_report(base_prefix, path, xml_request, collection, encoding,
|
||||
radicale_filter.prop_match(item.vobject_item, f, "CR")
|
||||
for f in filter_)
|
||||
raise ValueError("Unsupported filter test: %r" % test)
|
||||
return all(radicale_filter.prop_match(item.vobject_item, f, "CR")
|
||||
for f in filter_)
|
||||
raise ValueError("unsupported filter %r for %r" % (filter_.tag, tag))
|
||||
|
||||
while retrieved_items:
|
||||
|
@ -356,7 +356,7 @@ class Configuration:
|
||||
"%s" % (option, section, source))
|
||||
raw_value = config[section][option]
|
||||
try:
|
||||
if type_ == bool and type(raw_value) != bool:
|
||||
if type_ == bool and not isinstance(raw_value, bool):
|
||||
raw_value = _convert_to_bool(raw_value)
|
||||
new_values[section][option] = type_(raw_value)
|
||||
except Exception as e:
|
||||
|
@ -82,7 +82,7 @@ def comp_match(item, filter_, level=0):
|
||||
return False
|
||||
if (level == 0 and name != "VCALENDAR" or
|
||||
level == 1 and name not in ("VTODO", "VEVENT", "VJOURNAL")):
|
||||
logger.warning("Filtering %s is not supported" % name)
|
||||
logger.warning("Filtering %s is not supported", name)
|
||||
return True
|
||||
# Point #3 and #4 of rfc4791-9.7.1
|
||||
components = ([item.vobject_item] if level == 0
|
||||
@ -463,7 +463,7 @@ def param_filter_match(vobject_item, filter_, parent_name, ns):
|
||||
name = filter_.get("name").upper()
|
||||
children = getattr(vobject_item, "%s_list" % parent_name, [])
|
||||
condition = any(name in child.params for child in children)
|
||||
if len(filter_):
|
||||
if len(filter_) > 0:
|
||||
if filter_[0].tag == xmlutils.make_tag(ns, "text-match"):
|
||||
return condition and text_match(
|
||||
vobject_item, filter_[0], parent_name, ns, name)
|
||||
|
@ -56,7 +56,7 @@ class RemoveTracebackFilter(logging.Filter):
|
||||
return True
|
||||
|
||||
|
||||
removeTracebackFilter = RemoveTracebackFilter()
|
||||
REMOVE_TRACEBACK_FILTER = RemoveTracebackFilter()
|
||||
|
||||
|
||||
class IdentLogRecordFactory:
|
||||
@ -118,9 +118,9 @@ class ThreadStreamsHandler(logging.Handler):
|
||||
# HACK: Workaround for Android
|
||||
self.lock = RwLockWrapper()
|
||||
|
||||
def setFormatter(self, form):
|
||||
super().setFormatter(form)
|
||||
self.fallback_handler.setFormatter(form)
|
||||
def setFormatter(self, fmt):
|
||||
super().setFormatter(fmt)
|
||||
self.fallback_handler.setFormatter(fmt)
|
||||
|
||||
def emit(self, record):
|
||||
try:
|
||||
@ -186,6 +186,6 @@ def set_level(level):
|
||||
level = getattr(logging, level.upper())
|
||||
logger.setLevel(level)
|
||||
if level == logging.DEBUG:
|
||||
logger.removeFilter(removeTracebackFilter)
|
||||
logger.removeFilter(REMOVE_TRACEBACK_FILTER)
|
||||
else:
|
||||
logger.addFilter(removeTracebackFilter)
|
||||
logger.addFilter(REMOVE_TRACEBACK_FILTER)
|
||||
|
@ -167,8 +167,8 @@ class ParallelHTTPSServer(ParallelHTTPServer):
|
||||
certificate_authority = None
|
||||
|
||||
def server_bind(self):
|
||||
super().server_bind()
|
||||
"""Create server by wrapping HTTP socket in an SSL socket."""
|
||||
super().server_bind()
|
||||
self.socket = ssl.wrap_socket(
|
||||
self.socket, self.key, self.certificate, server_side=True,
|
||||
cert_reqs=ssl.CERT_REQUIRED if self.certificate_authority else
|
||||
@ -210,9 +210,8 @@ class RequestHandler(wsgiref.simple_server.WSGIRequestHandler):
|
||||
def log_request(self, code="-", size="-"):
|
||||
"""Disable request logging."""
|
||||
|
||||
def log_error(self, format, *args):
|
||||
msg = format % args
|
||||
logger.error("An error occurred during request: %s" % msg)
|
||||
def log_error(self, format_, *args):
|
||||
logger.error("An error occurred during request: %s", format_ % args)
|
||||
|
||||
def get_environ(self):
|
||||
env = super().get_environ()
|
||||
|
@ -51,9 +51,9 @@ class Collection(
|
||||
CollectionHistoryMixin, CollectionLockMixin, CollectionMetaMixin,
|
||||
CollectionSyncMixin, CollectionUploadMixin, storage.BaseCollection):
|
||||
|
||||
def __init__(self, storage, path, filesystem_path=None):
|
||||
self._storage = storage
|
||||
folder = storage._get_collection_root_folder()
|
||||
def __init__(self, storage_, path, filesystem_path=None):
|
||||
self._storage = storage_
|
||||
folder = self._storage._get_collection_root_folder()
|
||||
# Path should already be sanitized
|
||||
self._path = pathutils.strip_path(path)
|
||||
self._encoding = self._storage.configuration.get("encoding", "stock")
|
||||
|
@ -27,7 +27,7 @@ from radicale.log import logger
|
||||
class StorageDiscoverMixin:
|
||||
|
||||
def discover(self, path, depth="0", child_context_manager=(
|
||||
lambda path, href=None: contextlib.ExitStack())):
|
||||
lambda path, href=None: contextlib.ExitStack())):
|
||||
# Path should already be sanitized
|
||||
sane_path = pathutils.strip_path(path)
|
||||
attributes = sane_path.split("/") if sane_path else []
|
||||
|
@ -62,12 +62,12 @@ class CollectionUploadMixin:
|
||||
raise ValueError(
|
||||
"Failed to store item %r in temporary collection %r: %s" %
|
||||
(uid, self.path, e)) from e
|
||||
href_candidates = []
|
||||
href_candidate_funtions = []
|
||||
if os.name in ("nt", "posix"):
|
||||
href_candidates.append(
|
||||
href_candidate_funtions.append(
|
||||
lambda: uid if uid.lower().endswith(suffix.lower())
|
||||
else uid + suffix)
|
||||
href_candidates.extend((
|
||||
href_candidate_funtions.extend((
|
||||
lambda: radicale_item.get_etag(uid).strip('"') + suffix,
|
||||
lambda: radicale_item.find_available_uid(hrefs.__contains__,
|
||||
suffix)))
|
||||
@ -75,19 +75,20 @@ class CollectionUploadMixin:
|
||||
|
||||
def replace_fn(source, target):
|
||||
nonlocal href
|
||||
while href_candidates:
|
||||
href = href_candidates.pop(0)()
|
||||
while href_candidate_funtions:
|
||||
href_fn = href_candidate_funtions.pop(0)
|
||||
href = href_fn()
|
||||
if href in hrefs:
|
||||
continue
|
||||
if not pathutils.is_safe_filesystem_path_component(href):
|
||||
if not href_candidates:
|
||||
if not href_candidate_funtions:
|
||||
raise pathutils.UnsafePathError(href)
|
||||
continue
|
||||
try:
|
||||
return os.replace(source, pathutils.path_to_filesystem(
|
||||
self._filesystem_path, href))
|
||||
except OSError as e:
|
||||
if href_candidates and (
|
||||
if href_candidate_funtions and (
|
||||
os.name == "posix" and e.errno == 22 or
|
||||
os.name == "nt" and e.errno == 123):
|
||||
continue
|
||||
|
@ -57,10 +57,6 @@ class BaseTest:
|
||||
|
||||
def request(self, method, path, data=None, **args):
|
||||
"""Send a request."""
|
||||
self.application._status = None
|
||||
self.application._headers = None
|
||||
self.application._answer = None
|
||||
|
||||
for key in args:
|
||||
args[key.upper()] = args[key]
|
||||
args["REQUEST_METHOD"] = method.upper()
|
||||
|
@ -67,7 +67,7 @@ class TestBaseAuthRequests(BaseTest):
|
||||
("tmp", "bepo", 207), ("tmp", "tmp", 401), ("tmp", "", 401),
|
||||
("unk", "unk", 401), ("unk", "", 401), ("", "", 401))
|
||||
for user, password, expected_status in test_matrix:
|
||||
status, _, answer = self.request(
|
||||
status, _, _ = self.request(
|
||||
"PROPFIND", "/",
|
||||
HTTP_AUTHORIZATION="Basic %s" % base64.b64encode(
|
||||
("%s:%s" % (user, password)).encode()).decode())
|
||||
@ -163,7 +163,7 @@ class TestBaseAuthRequests(BaseTest):
|
||||
self.configuration.update(
|
||||
{"auth": {"type": "radicale.tests.custom.auth"}}, "test")
|
||||
self.application = Application(self.configuration)
|
||||
status, _, answer = self.request(
|
||||
status, _, _ = self.request(
|
||||
"PROPFIND", "/tmp", HTTP_AUTHORIZATION="Basic %s" %
|
||||
base64.b64encode(("tmp:").encode()).decode())
|
||||
assert status == 207
|
||||
|
@ -1317,7 +1317,7 @@ class BaseRequestsMixIn:
|
||||
calendar_path = "/calendar.ics/"
|
||||
status, _, _ = self.request("MKCALENDAR", calendar_path)
|
||||
assert status == 201
|
||||
sync_token, xml = self._report_sync_token(
|
||||
sync_token, _ = self._report_sync_token(
|
||||
calendar_path, "http://radicale.org/ns/sync/INVALID")
|
||||
assert not sync_token
|
||||
|
||||
@ -1326,13 +1326,12 @@ class BaseRequestsMixIn:
|
||||
calendar_path = "/calendar.ics/"
|
||||
status, _, _ = self.request("MKCALENDAR", calendar_path)
|
||||
assert status == 201
|
||||
sync_token, xml = self._report_sync_token(calendar_path)
|
||||
sync_token, _ = self._report_sync_token(calendar_path)
|
||||
event = get_file_content("event1.ics")
|
||||
event_path = posixpath.join(calendar_path, "event.ics")
|
||||
status, _, _ = self.request("PUT", event_path, event)
|
||||
assert status == 201
|
||||
new_sync_token, xml = self._report_sync_token(calendar_path,
|
||||
sync_token)
|
||||
new_sync_token, _ = self._report_sync_token(calendar_path, sync_token)
|
||||
assert sync_token != new_sync_token
|
||||
|
||||
def test_propfind_same_as_sync_collection_sync_token(self):
|
||||
@ -1340,9 +1339,8 @@ class BaseRequestsMixIn:
|
||||
calendar_path = "/calendar.ics/"
|
||||
status, _, _ = self.request("MKCALENDAR", calendar_path)
|
||||
assert status == 201
|
||||
sync_token, xml = self._report_sync_token(calendar_path)
|
||||
new_sync_token, xml = self._report_sync_token(calendar_path,
|
||||
sync_token)
|
||||
sync_token, _ = self._report_sync_token(calendar_path)
|
||||
new_sync_token, _ = self._report_sync_token(calendar_path, sync_token)
|
||||
if not self.full_sync_token_support and not new_sync_token:
|
||||
return
|
||||
assert sync_token == new_sync_token
|
||||
@ -1504,9 +1502,9 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
|
||||
|
||||
def test_hook(self):
|
||||
"""Run hook."""
|
||||
self.configuration.update({"storage": {"hook": (
|
||||
"mkdir %s" % os.path.join("collection-root", "created_by_hook"))
|
||||
}}, "test")
|
||||
self.configuration.update({"storage": {
|
||||
"hook": ("mkdir %s" % os.path.join(
|
||||
"collection-root", "created_by_hook"))}}, "test")
|
||||
self.application = Application(self.configuration)
|
||||
status, _, _ = self.request("MKCALENDAR", "/calendar.ics/")
|
||||
assert status == 201
|
||||
@ -1515,9 +1513,9 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
|
||||
|
||||
def test_hook_read_access(self):
|
||||
"""Verify that hook is not run for read accesses."""
|
||||
self.configuration.update({"storage": {"hook": (
|
||||
"mkdir %s" % os.path.join("collection-root", "created_by_hook"))
|
||||
}}, "test")
|
||||
self.configuration.update({"storage": {
|
||||
"hook": ("mkdir %s" % os.path.join(
|
||||
"collection-root", "created_by_hook"))}}, "test")
|
||||
self.application = Application(self.configuration)
|
||||
status, _, _ = self.request("PROPFIND", "/")
|
||||
assert status == 207
|
||||
@ -1536,9 +1534,9 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
|
||||
|
||||
def test_hook_principal_collection_creation(self):
|
||||
"""Verify that the hooks runs when a new user is created."""
|
||||
self.configuration.update({"storage": {"hook": (
|
||||
"mkdir %s" % os.path.join("collection-root", "created_by_hook"))
|
||||
}}, "test")
|
||||
self.configuration.update({"storage": {
|
||||
"hook": ("mkdir %s" % os.path.join(
|
||||
"collection-root", "created_by_hook"))}}, "test")
|
||||
self.application = Application(self.configuration)
|
||||
status, _, _ = self.request("PROPFIND", "/", HTTP_AUTHORIZATION=(
|
||||
"Basic " + base64.b64encode(b"user:").decode()))
|
||||
@ -1633,11 +1631,10 @@ class TestCustomStorageSystem(BaseFileSystemTest):
|
||||
"""Test custom backend loading."""
|
||||
storage_type = "radicale.tests.custom.storage_simple_sync"
|
||||
full_sync_token_support = False
|
||||
_report_sync_token = BaseRequestsMixIn._report_sync_token
|
||||
test_root = BaseRequestsMixIn.test_root
|
||||
|
||||
|
||||
# include tests related to sync token
|
||||
for s in dir(BaseRequestsMixIn):
|
||||
if s.startswith("test_") and ("_sync_" in s or s.endswith("_sync")):
|
||||
setattr(TestCustomStorageSystem, s, getattr(BaseRequestsMixIn, s))
|
||||
_report_sync_token = BaseRequestsMixIn._report_sync_token
|
||||
# include tests related to sync token
|
||||
for s in dir(BaseRequestsMixIn):
|
||||
if s.startswith("test_") and ("_sync_" in s or s.endswith("_sync")):
|
||||
locals()[s] = getattr(BaseRequestsMixIn, s)
|
||||
del s
|
||||
|
@ -77,7 +77,7 @@ class TestConfig:
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
config.load([(config_path, False)])
|
||||
e = exc_info.value
|
||||
assert ("Failed to load config file %r" % config_path) in str(e)
|
||||
assert "Failed to load config file %r" % config_path in str(e)
|
||||
|
||||
def test_load_multiple(self):
|
||||
config_path1 = self._write_config({
|
||||
@ -142,41 +142,41 @@ class TestConfig:
|
||||
assert "Invalid section 'internal'" in str(e)
|
||||
|
||||
def test_plugin_schema(self):
|
||||
PLUGIN_SCHEMA = {"auth": {"new_option": {"value": "False",
|
||||
plugin_schema = {"auth": {"new_option": {"value": "False",
|
||||
"type": bool}}}
|
||||
configuration = config.load()
|
||||
configuration.update({"auth": {"type": "new_plugin"}}, "test")
|
||||
plugin_configuration = configuration.copy(PLUGIN_SCHEMA)
|
||||
plugin_configuration = configuration.copy(plugin_schema)
|
||||
assert plugin_configuration.get("auth", "new_option") is False
|
||||
configuration.update({"auth": {"new_option": "True"}}, "test")
|
||||
plugin_configuration = configuration.copy(PLUGIN_SCHEMA)
|
||||
plugin_configuration = configuration.copy(plugin_schema)
|
||||
assert plugin_configuration.get("auth", "new_option") is True
|
||||
|
||||
def test_plugin_schema_duplicate_option(self):
|
||||
PLUGIN_SCHEMA = {"auth": {"type": {"value": "False",
|
||||
plugin_schema = {"auth": {"type": {"value": "False",
|
||||
"type": bool}}}
|
||||
configuration = config.load()
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
configuration.copy(PLUGIN_SCHEMA)
|
||||
configuration.copy(plugin_schema)
|
||||
e = exc_info.value
|
||||
assert "option already exists in 'auth': 'type'" in str(e)
|
||||
|
||||
def test_plugin_schema_invalid(self):
|
||||
PLUGIN_SCHEMA = {"server": {"new_option": {"value": "False",
|
||||
plugin_schema = {"server": {"new_option": {"value": "False",
|
||||
"type": bool}}}
|
||||
configuration = config.load()
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
configuration.copy(PLUGIN_SCHEMA)
|
||||
configuration.copy(plugin_schema)
|
||||
e = exc_info.value
|
||||
assert "not a plugin section: 'server" in str(e)
|
||||
|
||||
def test_plugin_schema_option_invalid(self):
|
||||
PLUGIN_SCHEMA = {"auth": {}}
|
||||
plugin_schema = {"auth": {}}
|
||||
configuration = config.load()
|
||||
configuration.update({"auth": {"type": "new_plugin",
|
||||
"new_option": False}}, "test")
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
configuration.copy(PLUGIN_SCHEMA)
|
||||
configuration.copy(plugin_schema)
|
||||
e = exc_info.value
|
||||
assert "Invalid option 'new_option'" in str(e)
|
||||
assert "section 'auth'" in str(e)
|
||||
|
@ -132,7 +132,7 @@ class TestBaseServerRequests:
|
||||
self.sockname = sock.getsockname()[:2]
|
||||
self.configuration.update({
|
||||
"server": {"hosts": "[%s]:%d" % self.sockname}}, "test")
|
||||
savedEaiAddrfamily = server.EAI_ADDRFAMILY
|
||||
original_eai_addrfamily = server.EAI_ADDRFAMILY
|
||||
if os.name == "nt" and server.EAI_ADDRFAMILY is None:
|
||||
# HACK: incomplete errno conversion in WINE
|
||||
server.EAI_ADDRFAMILY = -9
|
||||
@ -140,7 +140,7 @@ class TestBaseServerRequests:
|
||||
self.thread.start()
|
||||
status, _, _ = self.request("GET", "/")
|
||||
finally:
|
||||
server.EAI_ADDRFAMILY = savedEaiAddrfamily
|
||||
server.EAI_ADDRFAMILY = original_eai_addrfamily
|
||||
assert status == 302
|
||||
|
||||
def test_command_line_interface(self):
|
||||
|
@ -63,7 +63,7 @@ def pretty_xml(element, level=0):
|
||||
if not level:
|
||||
element = copy.deepcopy(element)
|
||||
i = "\n" + level * " "
|
||||
if len(element):
|
||||
if len(element) > 0:
|
||||
if not element.text or not element.text.strip():
|
||||
element.text = i + " "
|
||||
if not element.tail or not element.tail.strip():
|
||||
|
Loading…
x
Reference in New Issue
Block a user