SQLite: make postponing transaction committing possible.
This should significantly improve performance when used to write large amounts of messages. This feature is enabled through the fsync configuration option. Code refactorize around fsync. This addresses #390 (although it doesn't necessarily fix all instances of that problem yet). Github-ref: https://github.com/OfflineIMAP/offlineimap/issues/390 Originally-written-by: Giel van Schijndel <me@mortis.eu> Signed-off-by: Nicolas Sebrecht <nicolas.s-dev@laposte.net>
This commit is contained in:
parent
41c9694488
commit
b6ede627a9
@ -164,6 +164,9 @@ accounts = Test
|
|||||||
# at the expense of greater risk of message duplication in the event of a system
|
# at the expense of greater risk of message duplication in the event of a system
|
||||||
# crash or power loss. Default is true. Set it to false to disable fsync.
|
# crash or power loss. Default is true. Set it to false to disable fsync.
|
||||||
#
|
#
|
||||||
|
# SQLite honors this option since v7.0.8+. However, those SQLite improvements
|
||||||
|
# are still EXPERIMENTAL.
|
||||||
|
#
|
||||||
#fsync = true
|
#fsync = true
|
||||||
|
|
||||||
|
|
||||||
|
@ -70,6 +70,7 @@ class BaseFolder(object):
|
|||||||
|
|
||||||
self._sync_deletes = self.config.getdefaultboolean(
|
self._sync_deletes = self.config.getdefaultboolean(
|
||||||
self.repoconfname, "sync_deletes", True)
|
self.repoconfname, "sync_deletes", True)
|
||||||
|
self._dofsync = self.config.getdefaultboolean("general", "fsync", True)
|
||||||
|
|
||||||
# Determine if we're running static or dynamic folder filtering
|
# Determine if we're running static or dynamic folder filtering
|
||||||
# and check filtering status.
|
# and check filtering status.
|
||||||
@ -103,6 +104,16 @@ class BaseFolder(object):
|
|||||||
# fails if the str is utf-8
|
# fails if the str is utf-8
|
||||||
return self.name.decode('utf-8')
|
return self.name.decode('utf-8')
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
"""Starts a transaction. This will postpone (guaranteed) saving to disk
|
||||||
|
of all messages saved inside this transaction until its committed."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
"""Commits a transaction, all messages saved inside this transaction
|
||||||
|
will only now be persisted to disk."""
|
||||||
|
pass
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def accountname(self):
|
def accountname(self):
|
||||||
"""Account name as string"""
|
"""Account name as string"""
|
||||||
@ -118,6 +129,9 @@ class BaseFolder(object):
|
|||||||
else:
|
else:
|
||||||
return self.repository.should_sync_folder(self.ffilter_name)
|
return self.repository.should_sync_folder(self.ffilter_name)
|
||||||
|
|
||||||
|
def dofsync(self):
|
||||||
|
return self._dofsync
|
||||||
|
|
||||||
def suggeststhreads(self):
|
def suggeststhreads(self):
|
||||||
"""Returns True if this folder suggests using threads for actions.
|
"""Returns True if this folder suggests using threads for actions.
|
||||||
|
|
||||||
@ -891,38 +905,39 @@ class BaseFolder(object):
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
for num, uid in enumerate(copylist):
|
with self:
|
||||||
# Bail out on CTRL-C or SIGTERM.
|
for num, uid in enumerate(copylist):
|
||||||
if offlineimap.accounts.Account.abort_NOW_signal.is_set():
|
# Bail out on CTRL-C or SIGTERM.
|
||||||
break
|
if offlineimap.accounts.Account.abort_NOW_signal.is_set():
|
||||||
|
break
|
||||||
|
|
||||||
if uid == 0:
|
if uid == 0:
|
||||||
self.ui.warn("Assertion that UID != 0 failed; ignoring message.")
|
self.ui.warn("Assertion that UID != 0 failed; ignoring message.")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if uid > 0 and dstfolder.uidexists(uid):
|
if uid > 0 and dstfolder.uidexists(uid):
|
||||||
# dstfolder has message with that UID already, only update status.
|
# dstfolder has message with that UID already, only update status.
|
||||||
flags = self.getmessageflags(uid)
|
flags = self.getmessageflags(uid)
|
||||||
rtime = self.getmessagetime(uid)
|
rtime = self.getmessagetime(uid)
|
||||||
statusfolder.savemessage(uid, None, flags, rtime)
|
statusfolder.savemessage(uid, None, flags, rtime)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
self.ui.copyingmessage(uid, num+1, num_to_copy, self, dstfolder)
|
self.ui.copyingmessage(uid, num+1, num_to_copy, self, dstfolder)
|
||||||
# Exceptions are caught in copymessageto().
|
# Exceptions are caught in copymessageto().
|
||||||
if self.suggeststhreads():
|
if self.suggeststhreads():
|
||||||
self.waitforthread()
|
self.waitforthread()
|
||||||
thread = threadutil.InstanceLimitedThread(
|
thread = threadutil.InstanceLimitedThread(
|
||||||
self.getinstancelimitnamespace(),
|
self.getinstancelimitnamespace(),
|
||||||
target=self.copymessageto,
|
target=self.copymessageto,
|
||||||
name="Copy message from %s:%s"% (self.repository, self),
|
name="Copy message from %s:%s"% (self.repository, self),
|
||||||
args=(uid, dstfolder, statusfolder)
|
args=(uid, dstfolder, statusfolder)
|
||||||
)
|
)
|
||||||
thread.start()
|
thread.start()
|
||||||
threads.append(thread)
|
threads.append(thread)
|
||||||
else:
|
else:
|
||||||
self.copymessageto(uid, dstfolder, statusfolder, register=0)
|
self.copymessageto(uid, dstfolder, statusfolder, register=0)
|
||||||
for thread in threads:
|
for thread in threads:
|
||||||
thread.join() # Block until all "copy" threads are done.
|
thread.join() # Block until all "copy" threads are done.
|
||||||
|
|
||||||
# Execute new mail hook if we have new mail.
|
# Execute new mail hook if we have new mail.
|
||||||
if self.have_newmail:
|
if self.have_newmail:
|
||||||
|
@ -92,6 +92,20 @@ class LocalStatusSQLiteFolder(BaseFolder):
|
|||||||
LocalStatusSQLiteFolder.locks[self.filename] = DatabaseFileLock()
|
LocalStatusSQLiteFolder.locks[self.filename] = DatabaseFileLock()
|
||||||
self._databaseFileLock = LocalStatusSQLiteFolder.locks[self.filename]
|
self._databaseFileLock = LocalStatusSQLiteFolder.locks[self.filename]
|
||||||
|
|
||||||
|
self._in_transactions = 0
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
if not self.dofsync():
|
||||||
|
assert self.connection is not None
|
||||||
|
self._in_transactions += 1
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
if not self.dofsync():
|
||||||
|
assert self._in_transactions > 0
|
||||||
|
self._in_transactions -= 1
|
||||||
|
if self._in_transactions < 1:
|
||||||
|
self.connection.commit()
|
||||||
|
|
||||||
def openfiles(self):
|
def openfiles(self):
|
||||||
# Make sure sqlite is in multithreading SERIALIZE mode.
|
# Make sure sqlite is in multithreading SERIALIZE mode.
|
||||||
assert sqlite.threadsafety == 1, 'Your sqlite is not multithreading safe.'
|
assert sqlite.threadsafety == 1, 'Your sqlite is not multithreading safe.'
|
||||||
@ -169,7 +183,8 @@ class LocalStatusSQLiteFolder(BaseFolder):
|
|||||||
else:
|
else:
|
||||||
self.connection.execute(sql, args)
|
self.connection.execute(sql, args)
|
||||||
success = True
|
success = True
|
||||||
self.connection.commit()
|
if not self._in_transactions:
|
||||||
|
self.connection.commit()
|
||||||
except sqlite.OperationalError as e:
|
except sqlite.OperationalError as e:
|
||||||
if e.args[0] == 'cannot commit - no transaction is active':
|
if e.args[0] == 'cannot commit - no transaction is active':
|
||||||
pass
|
pass
|
||||||
|
@ -60,7 +60,6 @@ class MaildirFolder(BaseFolder):
|
|||||||
def __init__(self, root, name, sep, repository):
|
def __init__(self, root, name, sep, repository):
|
||||||
self.sep = sep # needs to be set before super().__init__
|
self.sep = sep # needs to be set before super().__init__
|
||||||
super(MaildirFolder, self).__init__(name, repository)
|
super(MaildirFolder, self).__init__(name, repository)
|
||||||
self.dofsync = self.config.getdefaultboolean("general", "fsync", True)
|
|
||||||
self.root = root
|
self.root = root
|
||||||
# check if we should use a different infosep to support Win file systems
|
# check if we should use a different infosep to support Win file systems
|
||||||
self.wincompatible = self.config.getdefaultboolean(
|
self.wincompatible = self.config.getdefaultboolean(
|
||||||
@ -330,7 +329,7 @@ class MaildirFolder(BaseFolder):
|
|||||||
fd.write(content)
|
fd.write(content)
|
||||||
# Make sure the data hits the disk.
|
# Make sure the data hits the disk.
|
||||||
fd.flush()
|
fd.flush()
|
||||||
if self.dofsync:
|
if self.dofsync():
|
||||||
os.fsync(fd)
|
os.fsync(fd)
|
||||||
fd.close()
|
fd.close()
|
||||||
|
|
||||||
|
@ -34,7 +34,6 @@ class MappedIMAPFolder(IMAPFolder):
|
|||||||
|
|
||||||
Instance variables (self.):
|
Instance variables (self.):
|
||||||
dryrun: boolean.
|
dryrun: boolean.
|
||||||
dofsync: boolean for fsync calls.
|
|
||||||
r2l: dict mapping message uids: self.r2l[remoteuid]=localuid
|
r2l: dict mapping message uids: self.r2l[remoteuid]=localuid
|
||||||
l2r: dict mapping message uids: self.r2l[localuid]=remoteuid
|
l2r: dict mapping message uids: self.r2l[localuid]=remoteuid
|
||||||
#TODO: what is the difference, how are they used?
|
#TODO: what is the difference, how are they used?
|
||||||
@ -44,7 +43,6 @@ class MappedIMAPFolder(IMAPFolder):
|
|||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
IMAPFolder.__init__(self, *args, **kwargs)
|
IMAPFolder.__init__(self, *args, **kwargs)
|
||||||
self.dryrun = self.config.getdefaultboolean("general", "dry-run", True)
|
self.dryrun = self.config.getdefaultboolean("general", "dry-run", True)
|
||||||
self.dofsync = self.config.getdefaultboolean("general", "fsync", True)
|
|
||||||
self.maplock = Lock()
|
self.maplock = Lock()
|
||||||
self.diskr2l, self.diskl2r = self._loadmaps()
|
self.diskr2l, self.diskl2r = self._loadmaps()
|
||||||
self.r2l, self.l2r = None, None
|
self.r2l, self.l2r = None, None
|
||||||
@ -114,7 +112,7 @@ class MappedIMAPFolder(IMAPFolder):
|
|||||||
with open(mapfilenametmp, 'wt') as mapfilefd:
|
with open(mapfilenametmp, 'wt') as mapfilefd:
|
||||||
for (key, value) in self.diskl2r.items():
|
for (key, value) in self.diskl2r.items():
|
||||||
mapfilefd.write("%d:%d\n"% (key, value))
|
mapfilefd.write("%d:%d\n"% (key, value))
|
||||||
if self.dofsync is True:
|
if self.dofsync():
|
||||||
fsync(mapfilefd)
|
fsync(mapfilefd)
|
||||||
# The lock is released when the file descriptor ends.
|
# The lock is released when the file descriptor ends.
|
||||||
shutil.move(mapfilenametmp, mapfilename)
|
shutil.move(mapfilenametmp, mapfilename)
|
||||||
|
Loading…
Reference in New Issue
Block a user