From c8044b4755ec0c8fd0cdde30e12fff5678f8aa4a Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Fri, 21 Feb 2020 13:03:24 -0500 Subject: [PATCH 1/2] migrate event table primary keys from integer to bigint see: https://github.com/ansible/awx/issues/6010 --- awx/main/dispatch/worker/base.py | 6 +- awx/main/migrations/0113_v370_event_bigint.py | 118 ++++++++++++++++++ awx/main/models/__init__.py | 22 ++++ awx/main/models/events.py | 5 + awx/main/tasks.py | 52 +++++++- 5 files changed, 200 insertions(+), 3 deletions(-) create mode 100644 awx/main/migrations/0113_v370_event_bigint.py diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index ef6270ff9080..40601adf62bf 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -101,7 +101,6 @@ def process_task(self, body): def run(self, *args, **kwargs): signal.signal(signal.SIGINT, self.stop) signal.signal(signal.SIGTERM, self.stop) - self.worker.on_start() # Child should implement other things here @@ -115,6 +114,7 @@ def stop(self, signum, frame): class AWXConsumerRedis(AWXConsumerBase): def run(self, *args, **kwargs): super(AWXConsumerRedis, self).run(*args, **kwargs) + self.worker.on_start() queue = redis.Redis.from_url(settings.BROKER_URL) while True: @@ -130,12 +130,16 @@ def run(self, *args, **kwargs): super(AWXConsumerPG, self).run(*args, **kwargs) logger.warn(f"Running worker {self.name} listening to queues {self.queues}") + init = False while True: try: with pg_bus_conn() as conn: for queue in self.queues: conn.listen(queue) + if init is False: + self.worker.on_start() + init = True for e in conn.events(): self.process_task(json.loads(e.payload)) if self.should_stop: diff --git a/awx/main/migrations/0113_v370_event_bigint.py b/awx/main/migrations/0113_v370_event_bigint.py new file mode 100644 index 000000000000..e8b5af664fa5 --- /dev/null +++ b/awx/main/migrations/0113_v370_event_bigint.py @@ -0,0 +1,118 @@ +# Generated by Django 2.2.8 on 2020-02-21 16:31 + +from django.db import migrations, models, connection + + +def migrate_event_data(apps, schema_editor): + # see: https://github.com/ansible/awx/issues/6010 + # + # the goal of this function is to end with event tables (e.g., main_jobevent) + # that have a bigint primary key (because the old usage of an integer + # numeric isn't enough, as its range is about 2.1B, see: + # https://www.postgresql.org/docs/9.1/datatype-numeric.html) + + # unfortunately, we can't do this with a simple ALTER TABLE, because + # for tables with hundreds of millions or billions of rows, the ALTER TABLE + # can take *hours* on modest hardware. + # + # the approach in this migration means that post-migration, event data will + # *not* immediately show up, but will be repopulated over time progressively + # the trade-off here is not having to wait hours for the full data migration + # before you can start and run AWX again (including new playbook runs) + for tblname in ( + 'main_jobevent', 'main_inventoryupdateevent', + 'main_projectupdateevent', 'main_adhoccommandevent', + 'main_systemjobevent' + ): + with connection.cursor() as cursor: + # rename the current event table + cursor.execute( + f'ALTER TABLE {tblname} RENAME TO _old_{tblname};' + ) + # create a *new* table with the same schema + cursor.execute( + f'CREATE TABLE {tblname} (LIKE _old_{tblname} INCLUDING ALL);' + ) + # alter the *new* table so that the primary key is a big int + cursor.execute( + f'ALTER TABLE {tblname} ALTER COLUMN id TYPE bigint USING id::bigint;' + ) + + # recreate counter for the new table's primary key to + # start where the *old* table left off (we have to do this because the + # counter changed from an int to a bigint) + cursor.execute(f'DROP SEQUENCE IF EXISTS "{tblname}_id_seq" CASCADE;') + cursor.execute(f'CREATE SEQUENCE "{tblname}_id_seq";') + cursor.execute( + f'ALTER TABLE "{tblname}" ALTER COLUMN "id" ' + f"SET DEFAULT nextval('{tblname}_id_seq');" + ) + cursor.execute( + f"SELECT setval('{tblname}_id_seq', (SELECT MAX(id) FROM _old_{tblname}), true);" + ) + + # replace the BTREE index on main_jobevent.job_id with + # a BRIN index to drastically improve per-UJ lookup performance + # see: https://info.crunchydata.com/blog/postgresql-brin-indexes-big-data-performance-with-minimal-storage + if tblname == 'main_jobevent': + cursor.execute("SELECT indexname FROM pg_indexes WHERE tablename='main_jobevent' AND indexdef LIKE '%USING btree (job_id)';") + old_index = cursor.fetchone()[0] + cursor.execute(f'DROP INDEX {old_index}') + cursor.execute('CREATE INDEX main_jobevent_job_id_brin_idx ON main_jobevent USING brin (job_id);') + + # remove all of the indexes and constraints from the old table + # (they just slow down the data migration) + cursor.execute(f"SELECT indexname, indexdef FROM pg_indexes WHERE tablename='_old_{tblname}' AND indexname != '{tblname}_pkey';") + indexes = cursor.fetchall() + + cursor.execute(f"SELECT conname, contype, pg_catalog.pg_get_constraintdef(r.oid, true) as condef FROM pg_catalog.pg_constraint r WHERE r.conrelid = '_old_{tblname}'::regclass AND conname != '{tblname}_pkey';") + constraints = cursor.fetchall() + + for indexname, indexdef in indexes: + cursor.execute(f'DROP INDEX IF EXISTS {indexname}') + for conname, contype, condef in constraints: + cursor.execute(f'ALTER TABLE _old_{tblname} DROP CONSTRAINT IF EXISTS {conname}') + + +class FakeAlterField(migrations.AlterField): + + def database_forwards(self, *args): + # this is intentionally left blank, because we're + # going to accomplish the migration with some custom raw SQL + pass + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0112_v370_workflow_node_identifier'), + ] + + operations = [ + migrations.RunPython(migrate_event_data), + FakeAlterField( + model_name='adhoccommandevent', + name='id', + field=models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'), + ), + FakeAlterField( + model_name='inventoryupdateevent', + name='id', + field=models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'), + ), + FakeAlterField( + model_name='jobevent', + name='id', + field=models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'), + ), + FakeAlterField( + model_name='projectupdateevent', + name='id', + field=models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'), + ), + FakeAlterField( + model_name='systemjobevent', + name='id', + field=models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'), + ), + ] diff --git a/awx/main/models/__init__.py b/awx/main/models/__init__.py index 672d5e481a11..d0e1abdb07b6 100644 --- a/awx/main/models/__init__.py +++ b/awx/main/models/__init__.py @@ -3,6 +3,7 @@ # Django from django.conf import settings # noqa +from django.db import connection, ProgrammingError from django.db.models.signals import pre_delete # noqa # AWX @@ -79,6 +80,27 @@ User.add_to_class('accessible_objects', user_accessible_objects) +def enforce_bigint_pk_migration(): + # see: https://github.com/ansible/awx/issues/6010 + # look at all the event tables and verify that they have been fully migrated + # from the *old* int primary key table to the replacement bigint table + # if not, attempt to migrate them in the background + for tblname in ( + 'main_jobevent', 'main_inventoryupdateevent', + 'main_projectupdateevent', 'main_adhoccommandevent', + 'main_systemjobevent' + ): + with connection.cursor() as cursor: + try: + cursor.execute(f'SELECT MAX(id) FROM _old_{tblname}') + if cursor.fetchone(): + from awx.main.tasks import migrate_legacy_event_data + migrate_legacy_event_data.apply_async([tblname]) + except ProgrammingError: + # the table is gone (migration is unnecessary) + pass + + def cleanup_created_modified_by(sender, **kwargs): # work around a bug in django-polymorphic that doesn't properly # handle cascades for reverse foreign keys on the polymorphic base model diff --git a/awx/main/models/events.py b/awx/main/models/events.py index eef46bebe3e7..d5cd2d76e754 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -438,6 +438,7 @@ class Meta: ('job', 'parent_uuid'), ] + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID') job = models.ForeignKey( 'Job', related_name='job_events', @@ -526,6 +527,7 @@ class Meta: ('project_update', 'end_line'), ] + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID') project_update = models.ForeignKey( 'ProjectUpdate', related_name='project_update_events', @@ -677,6 +679,7 @@ class Meta: FAILED_EVENTS = [x[0] for x in EVENT_TYPES if x[2]] EVENT_CHOICES = [(x[0], x[1]) for x in EVENT_TYPES] + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID') event = models.CharField( max_length=100, choices=EVENT_CHOICES, @@ -739,6 +742,7 @@ class Meta: ('inventory_update', 'end_line'), ] + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID') inventory_update = models.ForeignKey( 'InventoryUpdate', related_name='inventory_update_events', @@ -772,6 +776,7 @@ class Meta: ('system_job', 'end_line'), ] + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID') system_job = models.ForeignKey( 'SystemJob', related_name='system_job_events', diff --git a/awx/main/tasks.py b/awx/main/tasks.py index ddbd6a4e2b1c..cdce0318e21a 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -26,7 +26,7 @@ # Django from django.conf import settings -from django.db import transaction, DatabaseError, IntegrityError +from django.db import transaction, DatabaseError, IntegrityError, ProgrammingError, connection from django.db.models.fields.related import ForeignKey from django.utils.timezone import now, timedelta from django.utils.encoding import smart_str @@ -59,7 +59,7 @@ Inventory, InventorySource, SmartInventoryMembership, Job, AdHocCommand, ProjectUpdate, InventoryUpdate, SystemJob, JobEvent, ProjectUpdateEvent, InventoryUpdateEvent, AdHocCommandEvent, SystemJobEvent, - build_safe_env + build_safe_env, enforce_bigint_pk_migration ) from awx.main.constants import ACTIVE_STATES from awx.main.exceptions import AwxTaskError @@ -135,6 +135,12 @@ def dispatch_startup(): if Instance.objects.me().is_controller(): awx_isolated_heartbeat() + # at process startup, detect the need to migrate old event records from int + # to bigint; at *some point* in the future, once certain versions of AWX + # and Tower fall out of use/support, we can probably just _assume_ that + # everybody has moved to bigint, and remove this code entirely + enforce_bigint_pk_migration() + def inform_cluster_of_shutdown(): try: @@ -679,6 +685,48 @@ def update_host_smart_inventory_memberships(): smart_inventory.update_computed_fields() +@task(queue=get_local_queuename) +def migrate_legacy_event_data(tblname): + if 'event' not in tblname: + return + with advisory_lock(f'bigint_migration_{tblname}', wait=False) as acquired: + if acquired is False: + return + chunk = 1000000 + + def _remaining(): + try: + cursor.execute(f'SELECT MAX(id) FROM _old_{tblname};') + return cursor.fetchone()[0] + except ProgrammingError: + # the table is gone (migration is unnecessary) + return None + + with connection.cursor() as cursor: + total_rows = _remaining() + while total_rows: + with transaction.atomic(): + cursor.execute( + f'INSERT INTO {tblname} SELECT * FROM _old_{tblname} ORDER BY id DESC LIMIT {chunk} RETURNING id;' + ) + last_insert_pk = cursor.fetchone() + if last_insert_pk is None: + # this means that the SELECT from the old table was + # empty, and there was nothing to insert (so we're done) + break + last_insert_pk = last_insert_pk[0] + cursor.execute( + f'DELETE FROM _old_{tblname} WHERE id IN (SELECT id FROM _old_{tblname} ORDER BY id DESC LIMIT {chunk});' + ) + logger.warn( + f'migrated int -> bigint rows to {tblname} from _old_{tblname}; # ({last_insert_pk} rows remaining)' + ) + + if _remaining() is None: + cursor.execute(f'DROP TABLE IF EXISTS _old_{tblname}') + logger.warn(f'{tblname} primary key migration to bigint has finished') + + @task(queue=get_local_queuename) def delete_inventory(inventory_id, user_id, retries=5): # Delete inventory as user From 301d6ff616c1728aa40c0a5357e781eb59903c5b Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Fri, 27 Mar 2020 09:28:10 -0400 Subject: [PATCH 2/2] make the job event bigint migration chunk size configurable --- awx/main/tasks.py | 2 +- awx/settings/defaults.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index cdce0318e21a..1f82b8ab9233 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -692,7 +692,7 @@ def migrate_legacy_event_data(tblname): with advisory_lock(f'bigint_migration_{tblname}', wait=False) as acquired: if acquired is False: return - chunk = 1000000 + chunk = settings.JOB_EVENT_MIGRATION_CHUNK_SIZE def _remaining(): try: diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 6989090dbc62..b254f70ddead 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -197,6 +197,9 @@ def IS_TESTING(argv=None): # The maximum size of the job event worker queue before requests are blocked JOB_EVENT_MAX_QUEUE_SIZE = 10000 +# The number of job events to migrate per-transaction when moving from int -> bigint +JOB_EVENT_MIGRATION_CHUNK_SIZE = 1000000 + # Disallow sending session cookies over insecure connections SESSION_COOKIE_SECURE = True