Skip to content

Commit

Permalink
Django backend: limit batch size for bulk_create operations (#3713)
Browse files Browse the repository at this point in the history
Postgresql has a `MaxAllocSize` that defaults to 1 GB [1].
If you try to insert more than that in one go (e.g. during import of a
large AiiDA export file), you encounter the error:

    psycopg2.errors.ProgramLimitExceeded: out of memory
    DETAIL:  Cannot enlarge string buffer containing 0 bytes by 1257443654 more bytes.

This commit avoids this issue by setting a batch size for `bulk_create`
operations. The size of the batch is configurable through the new
`db.batch_size` configuration option using `verdi config`.

[1] https://github.com/postgres/postgres/blob/master/src/include/utils/memutils.h#L40
max alloc" size
  • Loading branch information
ltalirz authored Feb 12, 2020
1 parent d08cffa commit b7bcf96
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 68 deletions.
3 changes: 2 additions & 1 deletion aiida/backends/djsite/db/migrations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from aiida.backends.manager import SCHEMA_VERSION_KEY, SCHEMA_VERSION_DESCRIPTION
from aiida.backends.manager import SCHEMA_GENERATION_KEY, SCHEMA_GENERATION_DESCRIPTION
from aiida.common.exceptions import AiidaException, DbContentError
from aiida.manage.configuration import get_config_option


class DeserializationException(AiidaException):
Expand Down Expand Up @@ -649,7 +650,7 @@ def set_value(
# so in general it is good to recursively clean
# all sub-items.
self.del_value(key, subspecifier_value=subspecifier_value)
cls.objects.bulk_create(to_store)
cls.objects.bulk_create(to_store, batch_size=get_config_option('db.batch_size'))

if with_transaction:
transaction.savepoint_commit(sid)
Expand Down
137 changes: 73 additions & 64 deletions aiida/manage/configuration/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,70 +21,6 @@
'Option', ['name', 'key', 'valid_type', 'valid_values', 'default', 'description', 'global_only']
)


def get_option(option_name):
"""Return a configuration option.configuration
:param option_name: the name of the configuration option
:return: the configuration option
:raises ValueError: if the configuration option does not exist
"""
try:
option = Option(option_name, **CONFIG_OPTIONS[option_name])
except KeyError:
raise ValueError('the option {} does not exist'.format(option_name))
else:
return option


def get_option_names():
"""Return a list of available option names.
:return: list of available option names
"""
return CONFIG_OPTIONS.keys()


def parse_option(option_name, option_value):
"""Parse and validate a value for a configuration option.
:param option_name: the name of the configuration option
:param option_value: the option value
:return: a tuple of the option and the parsed value
"""
option = get_option(option_name)

value = False

if option.valid_type == 'bool':
if isinstance(option_value, str):
if option_value.strip().lower() in ['0', 'false', 'f']:
value = False
elif option_value.strip().lower() in ['1', 'true', 't']:
value = True
else:
raise ValueError('option {} expects a boolean value'.format(option.name))
else:
value = bool(option_value)
elif option.valid_type == 'string':
value = str(option_value)
elif option.valid_type == 'int':
value = int(option_value)
elif option.valid_type == 'list_of_str':
value = option_value.split()
else:
raise NotImplementedError('Type string {} not implemented yet'.format(option.valid_type))

if option.valid_values is not None:
if value not in option.valid_values:
raise ValueError(
'{} is not among the list of accepted values for option {}.\nThe valid values are: '
'{}'.format(value, option.name, ', '.join(option.valid_values))
)

return option, value


CONFIG_OPTIONS = {
'runner.poll.interval': {
'key': 'runner_poll_interval',
Expand All @@ -102,6 +38,16 @@ def parse_option(option_name, option_value):
'description': 'The timeout in seconds for calls to the circus client',
'global_only': False,
},
'db.batch_size': {
'key': 'db_batch_size',
'valid_type': 'int',
'valid_values': None,
'default': 100000,
'description':
'Batch size for bulk CREATE operations in the database. Avoids hitting MaxAllocSize of PostgreSQL'
'(1GB) when creating large numbers of database records in one go.',
'global_only': False,
},
'verdi.shell.auto_import': {
'key': 'verdi_shell_auto_import',
'valid_type': 'string',
Expand Down Expand Up @@ -223,3 +169,66 @@ def parse_option(option_name, option_value):
'global_only': False,
},
}


def get_option(option_name):
"""Return a configuration option.configuration
:param option_name: the name of the configuration option
:return: the configuration option
:raises ValueError: if the configuration option does not exist
"""
try:
option = Option(option_name, **CONFIG_OPTIONS[option_name])
except KeyError:
raise ValueError('the option {} does not exist'.format(option_name))
else:
return option


def get_option_names():
"""Return a list of available option names.
:return: list of available option names
"""
return CONFIG_OPTIONS.keys()


def parse_option(option_name, option_value):
"""Parse and validate a value for a configuration option.
:param option_name: the name of the configuration option
:param option_value: the option value
:return: a tuple of the option and the parsed value
"""
option = get_option(option_name)

value = False

if option.valid_type == 'bool':
if isinstance(option_value, str):
if option_value.strip().lower() in ['0', 'false', 'f']:
value = False
elif option_value.strip().lower() in ['1', 'true', 't']:
value = True
else:
raise ValueError('option {} expects a boolean value'.format(option.name))
else:
value = bool(option_value)
elif option.valid_type == 'string':
value = str(option_value)
elif option.valid_type == 'int':
value = int(option_value)
elif option.valid_type == 'list_of_str':
value = option_value.split()
else:
raise NotImplementedError('Type string {} not implemented yet'.format(option.valid_type))

if option.valid_values is not None:
if value not in option.valid_values:
raise ValueError(
'{} is not among the list of accepted values for option {}.\nThe valid values are: '
'{}'.format(value, option.name, ', '.join(option.valid_values))
)

return option, value
11 changes: 8 additions & 3 deletions aiida/tools/importexport/dbimport/backends/django/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from aiida.tools.importexport.common.config import entity_names_to_signatures
from aiida.tools.importexport.common.utils import export_shard_uuid
from aiida.tools.importexport.dbimport.backends.utils import deserialize_field, merge_comment, merge_extras
from aiida.manage.configuration import get_config_option


def import_data_dj(
Expand Down Expand Up @@ -229,6 +230,10 @@ def import_data_dj(
# IMPORT DATA #
###############
# DO ALL WITH A TRANSACTION

# batch size for bulk create operations
batch_size = get_config_option('db.batch_size')

with transaction.atomic():
foreign_ids_reverse_mappings = {}
new_entries = {}
Expand Down Expand Up @@ -471,9 +476,9 @@ def import_data_dj(
if 'mtime' in [field.name for field in model._meta.local_fields]:
with models.suppress_auto_now([(model, ['mtime'])]):
# Store them all in once; however, the PK are not set in this way...
model.objects.bulk_create(objects_to_create)
model.objects.bulk_create(objects_to_create, batch_size=batch_size)
else:
model.objects.bulk_create(objects_to_create)
model.objects.bulk_create(objects_to_create, batch_size=batch_size)

# Get back the just-saved entries
just_saved_queryset = model.objects.filter(
Expand Down Expand Up @@ -625,7 +630,7 @@ def import_data_dj(
if not silent:
print(' ({} new links...)'.format(len(links_to_store)))

models.DbLink.objects.bulk_create(links_to_store)
models.DbLink.objects.bulk_create(links_to_store, batch_size=batch_size)
else:
if not silent:
print(' (0 new links...)')
Expand Down

0 comments on commit b7bcf96

Please sign in to comment.