Direct creation of files for batch uploads
This commit is contained in:
parent
7b79c00ae2
commit
63e00ca677
@ -68,8 +68,7 @@ class Collection(
|
|||||||
return self._path
|
return self._path
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def _atomic_write(self, path, mode="w", newline=None, sync_directory=True,
|
def _atomic_write(self, path, mode="w", newline=None):
|
||||||
replace_fn=os.replace):
|
|
||||||
directory = os.path.dirname(path)
|
directory = os.path.dirname(path)
|
||||||
tmp = NamedTemporaryFile(
|
tmp = NamedTemporaryFile(
|
||||||
mode=mode, dir=directory, delete=False, prefix=".Radicale.tmp-",
|
mode=mode, dir=directory, delete=False, prefix=".Radicale.tmp-",
|
||||||
@ -77,19 +76,14 @@ class Collection(
|
|||||||
try:
|
try:
|
||||||
yield tmp
|
yield tmp
|
||||||
tmp.flush()
|
tmp.flush()
|
||||||
try:
|
self._storage._fsync(tmp)
|
||||||
self._storage._fsync(tmp.fileno())
|
|
||||||
except OSError as e:
|
|
||||||
raise RuntimeError("Fsync'ing file %r failed: %s" %
|
|
||||||
(path, e)) from e
|
|
||||||
tmp.close()
|
tmp.close()
|
||||||
replace_fn(tmp.name, path)
|
os.replace(tmp.name, path)
|
||||||
except BaseException:
|
except BaseException:
|
||||||
tmp.close()
|
tmp.close()
|
||||||
os.remove(tmp.name)
|
os.remove(tmp.name)
|
||||||
raise
|
raise
|
||||||
if sync_directory:
|
self._storage._sync_directory(directory)
|
||||||
self._storage._sync_directory(directory)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def last_modified(self):
|
def last_modified(self):
|
||||||
@ -124,9 +118,13 @@ class Storage(
|
|||||||
"storage", "filesystem_folder")
|
"storage", "filesystem_folder")
|
||||||
return os.path.join(filesystem_folder, "collection-root")
|
return os.path.join(filesystem_folder, "collection-root")
|
||||||
|
|
||||||
def _fsync(self, fd):
|
def _fsync(self, f):
|
||||||
if self.configuration.get("storage", "_filesystem_fsync"):
|
if self.configuration.get("storage", "_filesystem_fsync"):
|
||||||
pathutils.fsync(fd)
|
try:
|
||||||
|
pathutils.fsync(f.fileno())
|
||||||
|
except OSError as e:
|
||||||
|
raise RuntimeError("Fsync'ing file %r failed: %s" %
|
||||||
|
(f.name, e)) from e
|
||||||
|
|
||||||
def _sync_directory(self, path):
|
def _sync_directory(self, path):
|
||||||
"""Sync directory to disk.
|
"""Sync directory to disk.
|
||||||
@ -140,7 +138,7 @@ class Storage(
|
|||||||
try:
|
try:
|
||||||
fd = os.open(path, 0)
|
fd = os.open(path, 0)
|
||||||
try:
|
try:
|
||||||
self._fsync(fd)
|
pathutils.fsync(fd)
|
||||||
finally:
|
finally:
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
|
@ -71,36 +71,34 @@ class CollectionUploadMixin:
|
|||||||
lambda: radicale_item.get_etag(uid).strip('"') + suffix,
|
lambda: radicale_item.get_etag(uid).strip('"') + suffix,
|
||||||
lambda: radicale_item.find_available_uid(hrefs.__contains__,
|
lambda: radicale_item.find_available_uid(hrefs.__contains__,
|
||||||
suffix)))
|
suffix)))
|
||||||
href = None
|
href = f = None
|
||||||
|
while href_candidate_funtions:
|
||||||
def replace_fn(source, target):
|
href = href_candidate_funtions.pop(0)()
|
||||||
nonlocal href
|
if href in hrefs:
|
||||||
while href_candidate_funtions:
|
continue
|
||||||
href_fn = href_candidate_funtions.pop(0)
|
if not pathutils.is_safe_filesystem_path_component(href):
|
||||||
href = href_fn()
|
if not href_candidate_funtions:
|
||||||
if href in hrefs:
|
raise pathutils.UnsafePathError(href)
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
f = open(pathutils.path_to_filesystem(
|
||||||
|
self._filesystem_path, href),
|
||||||
|
"w", newline="", encoding=self._encoding)
|
||||||
|
break
|
||||||
|
except OSError as e:
|
||||||
|
if href_candidate_funtions and (
|
||||||
|
os.name == "posix" and e.errno == 22 or
|
||||||
|
os.name == "nt" and e.errno == 123):
|
||||||
continue
|
continue
|
||||||
if not pathutils.is_safe_filesystem_path_component(href):
|
raise
|
||||||
if not href_candidate_funtions:
|
with f:
|
||||||
raise pathutils.UnsafePathError(href)
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
return os.replace(source, pathutils.path_to_filesystem(
|
|
||||||
self._filesystem_path, href))
|
|
||||||
except OSError as e:
|
|
||||||
if href_candidate_funtions and (
|
|
||||||
os.name == "posix" and e.errno == 22 or
|
|
||||||
os.name == "nt" and e.errno == 123):
|
|
||||||
continue
|
|
||||||
raise
|
|
||||||
|
|
||||||
with self._atomic_write(os.path.join(self._filesystem_path, "ign"),
|
|
||||||
newline="", sync_directory=False,
|
|
||||||
replace_fn=replace_fn) as f:
|
|
||||||
f.write(item.serialize())
|
f.write(item.serialize())
|
||||||
|
f.flush()
|
||||||
|
self._storage._fsync(f)
|
||||||
hrefs.add(href)
|
hrefs.add(href)
|
||||||
with self._atomic_write(os.path.join(cache_folder, href), "wb",
|
with open(os.path.join(cache_folder, href), "wb") as f:
|
||||||
sync_directory=False) as f:
|
|
||||||
pickle.dump(cache_content, f)
|
pickle.dump(cache_content, f)
|
||||||
|
f.flush()
|
||||||
|
self._storage._fsync(f)
|
||||||
self._storage._sync_directory(cache_folder)
|
self._storage._sync_directory(cache_folder)
|
||||||
self._storage._sync_directory(self._filesystem_path)
|
self._storage._sync_directory(self._filesystem_path)
|
||||||
|
Loading…
Reference in New Issue
Block a user