Skip to content

Commit

Permalink
Improve the efficiency of DbLog migration for Django (#2949)
Browse files Browse the repository at this point in the history
The old implementation was using Django's ORM to get the count and rows
of certain rows in the `DbLog` table. However, this was generating
extremely inefficient queries that made the migration run for days on
big databases. A similar problem was encountered with the original
SqlAlchemy implementation. That was solved in an earlier commit
`80a0aa9117f399a5385a30ba0ce3e2afc7016a4e`, which created a more
efficient query. We now replace the Django ORM solution with the same
custom SQL query as used for SqlAlchemy. This should significantly
improve the efficiency of this migration.
  • Loading branch information
sphuber authored May 29, 2019
1 parent c4eb8a0 commit 6da891f
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 38 deletions.
129 changes: 97 additions & 32 deletions aiida/backends/djsite/db/migrations/0024_dblog_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import print_function
import sys
from six.moves import zip
import click

# Remove when https://github.com/PyCQA/pylint/issues/1931 is fixed
Expand All @@ -36,44 +37,92 @@
leg_workflow_prefix = 'aiida.workflows.user.'


def get_legacy_workflow_log_number(dblog_model):
def get_legacy_workflow_log_number(schema_editor):
""" Get the number of the log records that correspond to legacy workflows """
return dblog_model.objects.filter(objname__startswith=leg_workflow_prefix).count()
with schema_editor.connection.cursor() as cursor:
cursor.execute("""
SELECT COUNT(*) FROM db_dblog
WHERE
(db_dblog.objname LIKE 'aiida.workflows.user.%')
""")
return cursor.fetchall()[0][0]


def get_unknown_entity_log_number(dblog_model):
def get_unknown_entity_log_number(schema_editor):
""" Get the number of the log records that correspond to unknown entities """
return dblog_model.objects.exclude(objname__startswith=node_prefix).exclude(
objname__startswith=leg_workflow_prefix).count()
with schema_editor.connection.cursor() as cursor:
cursor.execute("""
SELECT COUNT(*) FROM db_dblog
WHERE
(db_dblog.objname NOT LIKE 'node.%') AND
(db_dblog.objname NOT LIKE 'aiida.workflows.user.%')
""")
return cursor.fetchall()[0][0]


def get_logs_with_no_nodes_number(dblog_model, dbnode_model):
def get_logs_with_no_nodes_number(schema_editor):
""" Get the number of the log records that don't correspond to a node """
return dblog_model.objects.exclude(objpk__in=dbnode_model.objects.values('id')).filter(
objname__startswith=node_prefix).values(*values_to_export).count()
with schema_editor.connection.cursor() as cursor:
cursor.execute("""
SELECT COUNT(*) FROM db_dblog
WHERE
(db_dblog.objname LIKE 'node.%') AND NOT EXISTS
(SELECT 1 FROM db_dbnode WHERE db_dbnode.id = db_dblog.objpk LIMIT 1)
""")
return cursor.fetchall()[0][0]


def get_serialized_legacy_workflow_logs(dblog_model):
def get_serialized_legacy_workflow_logs(schema_editor):
""" Get the serialized log records that correspond to legacy workflows """
from aiida.backends.sqlalchemy.utils import dumps_json
queryset = dblog_model.objects.filter(objname__startswith=leg_workflow_prefix).values(*values_to_export)
return dumps_json(list(queryset))


def get_serialized_unknown_entity_logs(dblog_model):
with schema_editor.connection.cursor() as cursor:
cursor.execute(("""
SELECT db_dblog.id, db_dblog.time, db_dblog.loggername, db_dblog.levelname, db_dblog.objpk, db_dblog.objname,
db_dblog.message, db_dblog.metadata FROM db_dblog
WHERE
(db_dblog.objname LIKE 'aiida.workflows.user.%')
"""))
keys = ['id', 'time', 'loggername', 'levelname', 'objpk', 'objname', 'message', 'metadata']
res = list()
for row in cursor.fetchall():
res.append(dict(list(zip(keys, row))))
return dumps_json(res)


def get_serialized_unknown_entity_logs(schema_editor):
""" Get the serialized log records that correspond to unknown entities """
from aiida.backends.sqlalchemy.utils import dumps_json
queryset = dblog_model.objects.exclude(objname__startswith=node_prefix).exclude(
objname__startswith=leg_workflow_prefix).values(*values_to_export)
return dumps_json(list(queryset))


def get_serialized_logs_with_no_nodes(dblog_model, dbnode_model):
with schema_editor.connection.cursor() as cursor:
cursor.execute(("""
SELECT db_dblog.id, db_dblog.time, db_dblog.loggername, db_dblog.levelname, db_dblog.objpk, db_dblog.objname,
db_dblog.message, db_dblog.metadata FROM db_dblog
WHERE
(db_dblog.objname NOT LIKE 'node.%') AND
(db_dblog.objname NOT LIKE 'aiida.workflows.user.%')
"""))
keys = ['id', 'time', 'loggername', 'levelname', 'objpk', 'objname', 'message', 'metadata']
res = list()
for row in cursor.fetchall():
res.append(dict(list(zip(keys, row))))
return dumps_json(res)


def get_serialized_logs_with_no_nodes(schema_editor):
""" Get the serialized log records that don't correspond to a node """
from aiida.backends.sqlalchemy.utils import dumps_json
queryset = dblog_model.objects.exclude(objpk__in=dbnode_model.objects.values('id')).filter(
objname__startswith=node_prefix).values(*values_to_export)
return dumps_json(list(queryset))
with schema_editor.connection.cursor() as cursor:
cursor.execute(("""
SELECT db_dblog.id, db_dblog.time, db_dblog.loggername, db_dblog.levelname, db_dblog.objpk, db_dblog.objname,
db_dblog.message, db_dblog.metadata FROM db_dblog
WHERE
(db_dblog.objname LIKE 'node.%') AND NOT EXISTS
(SELECT 1 FROM db_dbnode WHERE db_dbnode.id = db_dblog.objpk LIMIT 1)
"""))
keys = ['id', 'time', 'loggername', 'levelname', 'objpk', 'objname', 'message', 'metadata']
res = list()
for row in cursor.fetchall():
res.append(dict(list(zip(keys, row))))
return dumps_json(res)


def set_new_uuid(apps, _):
Expand All @@ -86,18 +135,17 @@ def set_new_uuid(apps, _):
log.save(update_fields=['uuid'])


def export_and_clean_workflow_logs(apps, _):
def export_and_clean_workflow_logs(apps, schema_editor):
"""
Export the logs records that correspond to legacy workflows and to unknown entities.
"""
from tempfile import NamedTemporaryFile

DbLog = apps.get_model('db', 'DbLog')
DbNode = apps.get_model('db', 'DbNode')

lwf_number = get_legacy_workflow_log_number(DbLog)
other_number = get_unknown_entity_log_number(DbLog)
log_no_node_number = get_logs_with_no_nodes_number(DbLog, DbNode)
lwf_number = get_legacy_workflow_log_number(schema_editor)
other_number = get_unknown_entity_log_number(schema_editor)
log_no_node_number = get_logs_with_no_nodes_number(schema_editor)

# If there are no legacy workflow log records or log records of an unknown entity
if lwf_number == 0 and other_number == 0 and log_no_node_number == 0:
Expand All @@ -120,44 +168,61 @@ def export_and_clean_workflow_logs(apps, _):
with NamedTemporaryFile(
prefix='legagy_wf_logs-', suffix='.log', dir='.', delete=delete_on_close, mode='w+') as handle:
filename = handle.name
handle.write(get_serialized_legacy_workflow_logs(DbLog))
handle.write(get_serialized_legacy_workflow_logs(schema_editor))

# If delete_on_close is False, we are running for the user and add additional message of file location
if not delete_on_close:
click.echo('Exported legacy workflow logs to {}'.format(filename))

# Now delete the records
DbLog.objects.filter(objname__startswith=leg_workflow_prefix).delete()
with schema_editor.connection.cursor() as cursor:
cursor.execute(("""
DELETE FROM db_dblog
WHERE
(db_dblog.objname LIKE 'aiida.workflows.user.%')
"""))

# Exporting unknown log records
if other_number != 0:
# Get the records and write them to file
with NamedTemporaryFile(
prefix='unknown_entity_logs-', suffix='.log', dir='.', delete=delete_on_close, mode='w+') as handle:
filename = handle.name
handle.write(get_serialized_unknown_entity_logs(DbLog))
handle.write(get_serialized_unknown_entity_logs(schema_editor))

# If delete_on_close is False, we are running for the user and add additional message of file location
if not delete_on_close:
click.echo('Exported unexpected entity logs to {}'.format(filename))

# Now delete the records
DbLog.objects.exclude(objname__startswith=node_prefix).exclude(objname__startswith=leg_workflow_prefix).delete()
with schema_editor.connection.cursor() as cursor:
cursor.execute(("""
DELETE FROM db_dblog WHERE
(db_dblog.objname NOT LIKE 'node.%') AND
(db_dblog.objname NOT LIKE 'aiida.workflows.user.%')
"""))

# Exporting log records that don't correspond to nodes
if log_no_node_number != 0:
# Get the records and write them to file
with NamedTemporaryFile(
prefix='no_node_entity_logs-', suffix='.log', dir='.', delete=delete_on_close, mode='w+') as handle:
filename = handle.name
handle.write(get_serialized_logs_with_no_nodes(DbLog, DbNode))
handle.write(get_serialized_logs_with_no_nodes(schema_editor))

# If delete_on_close is False, we are running for the user and add additional message of file location
if not delete_on_close:
click.echo('Exported entity logs that don\'t correspond to nodes to {}'.format(filename))

# Now delete the records
DbLog.objects.exclude(objpk__in=DbNode.objects.values('id')).filter(objname__startswith=node_prefix).delete()
with schema_editor.connection.cursor() as cursor:
cursor.execute(("""
DELETE FROM db_dblog WHERE
(db_dblog.objname LIKE 'node.%') AND NOT EXISTS
(SELECT 1 FROM db_dbnode WHERE db_dbnode.id = db_dblog.objpk LIMIT 1)
"""))


def clean_dblog_metadata(apps, _):
Expand Down
13 changes: 7 additions & 6 deletions aiida/backends/djsite/db/subtests/test_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def setUp(self):
self.migrate_to = [(self.app, self.migrate_to)]
executor = MigrationExecutor(connection)
self.apps = executor.loader.project_state(self.migrate_from).apps
self.schema_editor = connection.schema_editor()

# Reverse to the original migration
executor.migrate(self.migrate_from)
Expand Down Expand Up @@ -452,9 +453,9 @@ def setUpBeforeMigration(self):
serialized_param_data = dumps_json(list(param_data))
# Getting the serialized logs for the unknown entity logs (as the export migration fuction
# provides them) - this should coincide to the above
serialized_unknown_exp_logs = update_024.get_serialized_unknown_entity_logs(DbLog)
serialized_unknown_exp_logs = update_024.get_serialized_unknown_entity_logs(self.schema_editor)
# Getting their number
unknown_exp_logs_number = update_024.get_unknown_entity_log_number(DbLog)
unknown_exp_logs_number = update_024.get_unknown_entity_log_number(self.schema_editor)
self.to_check['Dict'] = (serialized_param_data, serialized_unknown_exp_logs,
unknown_exp_logs_number)

Expand All @@ -466,8 +467,8 @@ def setUpBeforeMigration(self):
serialized_leg_wf_logs = dumps_json(list(leg_wf))
# Getting the serialized logs for the legacy workflow logs (as the export migration function
# provides them) - this should coincide to the above
serialized_leg_wf_exp_logs = update_024.get_serialized_legacy_workflow_logs(DbLog)
eg_wf_exp_logs_number = update_024.get_legacy_workflow_log_number(DbLog)
serialized_leg_wf_exp_logs = update_024.get_serialized_legacy_workflow_logs(self.schema_editor)
eg_wf_exp_logs_number = update_024.get_legacy_workflow_log_number(self.schema_editor)
self.to_check['WorkflowNode'] = (serialized_leg_wf_logs, serialized_leg_wf_exp_logs, eg_wf_exp_logs_number)

# Getting the serialized logs that don't correspond to a DbNode record
Expand All @@ -477,8 +478,8 @@ def setUpBeforeMigration(self):
serialized_logs_no_node = dumps_json(list(logs_no_node))
# Getting the serialized logs that don't correspond to a node (as the export migration function
# provides them) - this should coincide to the above
serialized_logs_exp_no_node = update_024.get_serialized_logs_with_no_nodes(DbLog, DbNode)
logs_no_node_number = update_024.get_logs_with_no_nodes_number(DbLog, DbNode)
serialized_logs_exp_no_node = update_024.get_serialized_logs_with_no_nodes(self.schema_editor)
logs_no_node_number = update_024.get_logs_with_no_nodes_number(self.schema_editor)
self.to_check['NoNode'] = (serialized_logs_no_node, serialized_logs_exp_no_node, logs_no_node_number)

def tearDown(self):
Expand Down

0 comments on commit 6da891f

Please sign in to comment.