Use sys.platform instead of os.name

mypy only recognizes sys.platform
This commit is contained in:
Unrud 2022-02-01 17:53:46 +01:00
parent 523960bc9f
commit 0221fc357b
7 changed files with 21 additions and 33 deletions

View File

@ -39,11 +39,11 @@ from radicale.log import logger
def run() -> None: def run() -> None:
"""Run Radicale as a standalone server.""" """Run Radicale as a standalone server."""
exit_signal_numbers = [signal.SIGTERM, signal.SIGINT] exit_signal_numbers = [signal.SIGTERM, signal.SIGINT]
if os.name == "posix":
exit_signal_numbers.append(signal.SIGHUP)
exit_signal_numbers.append(signal.SIGQUIT)
if sys.platform == "win32": if sys.platform == "win32":
exit_signal_numbers.append(signal.SIGBREAK) exit_signal_numbers.append(signal.SIGBREAK)
else:
exit_signal_numbers.append(signal.SIGHUP)
exit_signal_numbers.append(signal.SIGQUIT)
# Raise SystemExit when signal arrives to run cleanup code # Raise SystemExit when signal arrives to run cleanup code
# (like destructors, try-finish etc.), otherwise the process exits # (like destructors, try-finish etc.), otherwise the process exits

View File

@ -69,7 +69,7 @@ if sys.platform == "win32":
ctypes.wintypes.DWORD, ctypes.wintypes.DWORD,
ctypes.POINTER(Overlapped)] ctypes.POINTER(Overlapped)]
unlock_file_ex.restype = ctypes.wintypes.BOOL unlock_file_ex.restype = ctypes.wintypes.BOOL
elif os.name == "posix": else:
import fcntl import fcntl
if sys.platform == "linux": if sys.platform == "linux":
@ -127,16 +127,13 @@ class RwLock:
except OSError as e: except OSError as e:
raise RuntimeError("Locking the storage failed: %s" % e raise RuntimeError("Locking the storage failed: %s" % e
) from e ) from e
elif os.name == "posix": else:
_cmd = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH _cmd = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH
try: try:
fcntl.flock(lock_file.fileno(), _cmd) fcntl.flock(lock_file.fileno(), _cmd)
except OSError as e: except OSError as e:
raise RuntimeError("Locking the storage failed: %s" % e raise RuntimeError("Locking the storage failed: %s" % e
) from e ) from e
else:
raise RuntimeError("Locking the storage failed: "
"Unsupported operating system")
with self._lock: with self._lock:
if self._writer or mode == "w" and self._readers != 0: if self._writer or mode == "w" and self._readers != 0:
raise RuntimeError("Locking the storage failed: " raise RuntimeError("Locking the storage failed: "
@ -196,7 +193,7 @@ def rename_exchange(src: str, dst: str) -> None:
def fsync(fd: int) -> None: def fsync(fd: int) -> None:
if os.name == "posix" and hasattr(fcntl, "F_FULLFSYNC"): if sys.platform != "win32" and hasattr(fcntl, "F_FULLFSYNC"):
fcntl.fcntl(fd, fcntl.F_FULLFSYNC) fcntl.fcntl(fd, fcntl.F_FULLFSYNC)
else: else:
os.fsync(fd) os.fsync(fd)

View File

@ -17,6 +17,7 @@
# along with Radicale. If not, see <http://www.gnu.org/licenses/>. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import os import os
import sys
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import IO, AnyStr, ClassVar, Iterator, Optional, Type from typing import IO, AnyStr, ClassVar, Iterator, Optional, Type
@ -93,7 +94,7 @@ class StorageBase(storage.BaseStorage):
""" """
if not self._filesystem_fsync: if not self._filesystem_fsync:
return return
if os.name == "posix": if sys.platform != "win32":
try: try:
fd = os.open(path, 0) fd = os.open(path, 0)
try: try:

View File

@ -68,11 +68,11 @@ class StoragePartLock(StorageBase):
# from sending SIGINT etc. # from sending SIGINT etc.
preexec_fn = None preexec_fn = None
creationflags = 0 creationflags = 0
if os.name == "posix": if sys.platform == "win32":
creationflags |= subprocess.CREATE_NEW_PROCESS_GROUP
else:
# Process group is also used to identify child processes # Process group is also used to identify child processes
preexec_fn = os.setpgrp preexec_fn = os.setpgrp
elif sys.platform == "win32":
creationflags |= subprocess.CREATE_NEW_PROCESS_GROUP
command = self._hook % { command = self._hook % {
"user": shlex.quote(user or "Anonymous")} "user": shlex.quote(user or "Anonymous")}
logger.debug("Running storage hook") logger.debug("Running storage hook")
@ -89,7 +89,7 @@ class StoragePartLock(StorageBase):
p.wait() p.wait()
raise raise
finally: finally:
if os.name == "posix": if sys.platform != "win32":
# Kill remaining children identified by process group # Kill remaining children identified by process group
with contextlib.suppress(OSError): with contextlib.suppress(OSError):
os.killpg(p.pid, signal.SIGKILL) os.killpg(p.pid, signal.SIGKILL)

View File

@ -16,6 +16,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import errno
import os import os
import pickle import pickle
import sys import sys
@ -76,15 +77,12 @@ class CollectionPartUpload(CollectionPartGet, CollectionPartCache,
raise ValueError( raise ValueError(
"Failed to store item %r in temporary collection %r: %s" % "Failed to store item %r in temporary collection %r: %s" %
(uid, self.path, e)) from e (uid, self.path, e)) from e
href_candidate_funtions = [] href_candidate_funtions = [
if os.name == "posix" or sys.platform == "win32": lambda: uid if uid.lower().endswith(suffix.lower())
href_candidate_funtions.append( else uid + suffix,
lambda: uid if uid.lower().endswith(suffix.lower())
else uid + suffix)
href_candidate_funtions.extend((
lambda: radicale_item.get_etag(uid).strip('"') + suffix, lambda: radicale_item.get_etag(uid).strip('"') + suffix,
lambda: radicale_item.find_available_uid(hrefs.__contains__, lambda: radicale_item.find_available_uid(
suffix))) hrefs.__contains__, suffix)]
href = f = None href = f = None
while href_candidate_funtions: while href_candidate_funtions:
href = href_candidate_funtions.pop(0)() href = href_candidate_funtions.pop(0)()
@ -101,7 +99,8 @@ class CollectionPartUpload(CollectionPartGet, CollectionPartCache,
break break
except OSError as e: except OSError as e:
if href_candidate_funtions and ( if href_candidate_funtions and (
os.name == "posix" and e.errno == 22 or sys.platform != "win32" and
e.errno == errno.EINVAL or
sys.platform == "win32" and e.errno == 123): sys.platform == "win32" and e.errno == 123):
continue continue
raise raise

View File

@ -219,7 +219,7 @@ class TestBaseServerRequests(BaseTest):
finally: finally:
p.terminate() p.terminate()
p.wait() p.wait()
if os.name == "posix": if sys.platform != "win32":
assert p.returncode == 0 assert p.returncode == 0
def test_command_line_interface_with_bool_options(self) -> None: def test_command_line_interface_with_bool_options(self) -> None:

View File

@ -22,7 +22,6 @@ Tests for storage backends.
import os import os
import shutil import shutil
import sys
from typing import ClassVar, cast from typing import ClassVar, cast
import pytest import pytest
@ -100,8 +99,6 @@ class TestMultiFileSystem(BaseTest):
assert answer1 == answer2 assert answer1 == answer2
assert os.path.exists(os.path.join(cache_folder, "event1.ics")) assert os.path.exists(os.path.join(cache_folder, "event1.ics"))
@pytest.mark.skipif(os.name != "posix" and sys.platform != "win32",
reason="Only supported on 'posix' and 'win32'")
def test_put_whole_calendar_uids_used_as_file_names(self) -> None: def test_put_whole_calendar_uids_used_as_file_names(self) -> None:
"""Test if UIDs are used as file names.""" """Test if UIDs are used as file names."""
_TestBaseRequests.test_put_whole_calendar( _TestBaseRequests.test_put_whole_calendar(
@ -110,8 +107,6 @@ class TestMultiFileSystem(BaseTest):
_, answer = self.get("/calendar.ics/%s.ics" % uid) _, answer = self.get("/calendar.ics/%s.ics" % uid)
assert "\r\nUID:%s\r\n" % uid in answer assert "\r\nUID:%s\r\n" % uid in answer
@pytest.mark.skipif(os.name != "posix" and sys.platform != "win32",
reason="Only supported on 'posix' and 'win32'")
def test_put_whole_calendar_random_uids_used_as_file_names(self) -> None: def test_put_whole_calendar_random_uids_used_as_file_names(self) -> None:
"""Test if UIDs are used as file names.""" """Test if UIDs are used as file names."""
_TestBaseRequests.test_put_whole_calendar_without_uids( _TestBaseRequests.test_put_whole_calendar_without_uids(
@ -127,8 +122,6 @@ class TestMultiFileSystem(BaseTest):
assert answer is not None assert answer is not None
assert "\r\nUID:%s\r\n" % uid in answer assert "\r\nUID:%s\r\n" % uid in answer
@pytest.mark.skipif(os.name != "posix" and sys.platform != "win32",
reason="Only supported on 'posix' and 'win32'")
def test_put_whole_addressbook_uids_used_as_file_names(self) -> None: def test_put_whole_addressbook_uids_used_as_file_names(self) -> None:
"""Test if UIDs are used as file names.""" """Test if UIDs are used as file names."""
_TestBaseRequests.test_put_whole_addressbook( _TestBaseRequests.test_put_whole_addressbook(
@ -137,8 +130,6 @@ class TestMultiFileSystem(BaseTest):
_, answer = self.get("/contacts.vcf/%s.vcf" % uid) _, answer = self.get("/contacts.vcf/%s.vcf" % uid)
assert "\r\nUID:%s\r\n" % uid in answer assert "\r\nUID:%s\r\n" % uid in answer
@pytest.mark.skipif(os.name != "posix" and sys.platform != "win32",
reason="Only supported on 'posix' and 'win32'")
def test_put_whole_addressbook_random_uids_used_as_file_names( def test_put_whole_addressbook_random_uids_used_as_file_names(
self) -> None: self) -> None:
"""Test if UIDs are used as file names.""" """Test if UIDs are used as file names."""