Skip to content

Commit

Permalink
Merge pull request #6032 from ryanpetrello/bigint
Browse files Browse the repository at this point in the history
migrate event table primary keys from integer to bigint

Reviewed-by: Ryan Petrello
             https://github.com/ryanpetrello
  • Loading branch information
softwarefactory-project-zuul[bot] committed Mar 27, 2020
2 parents c4ed9a1 + 301d6ff commit 155a1d9
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 3 deletions.
6 changes: 5 additions & 1 deletion awx/main/dispatch/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ def process_task(self, body):
def run(self, *args, **kwargs):
signal.signal(signal.SIGINT, self.stop)
signal.signal(signal.SIGTERM, self.stop)
self.worker.on_start()

# Child should implement other things here

Expand All @@ -115,6 +114,7 @@ def stop(self, signum, frame):
class AWXConsumerRedis(AWXConsumerBase):
def run(self, *args, **kwargs):
super(AWXConsumerRedis, self).run(*args, **kwargs)
self.worker.on_start()

queue = redis.Redis.from_url(settings.BROKER_URL)
while True:
Expand All @@ -130,12 +130,16 @@ def run(self, *args, **kwargs):
super(AWXConsumerPG, self).run(*args, **kwargs)

logger.warn(f"Running worker {self.name} listening to queues {self.queues}")
init = False

while True:
try:
with pg_bus_conn() as conn:
for queue in self.queues:
conn.listen(queue)
if init is False:
self.worker.on_start()
init = True
for e in conn.events():
self.process_task(json.loads(e.payload))
if self.should_stop:
Expand Down
118 changes: 118 additions & 0 deletions awx/main/migrations/0113_v370_event_bigint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Generated by Django 2.2.8 on 2020-02-21 16:31

from django.db import migrations, models, connection


def migrate_event_data(apps, schema_editor):
# see: https://github.com/ansible/awx/issues/6010
#
# the goal of this function is to end with event tables (e.g., main_jobevent)
# that have a bigint primary key (because the old usage of an integer
# numeric isn't enough, as its range is about 2.1B, see:
# https://www.postgresql.org/docs/9.1/datatype-numeric.html)

# unfortunately, we can't do this with a simple ALTER TABLE, because
# for tables with hundreds of millions or billions of rows, the ALTER TABLE
# can take *hours* on modest hardware.
#
# the approach in this migration means that post-migration, event data will
# *not* immediately show up, but will be repopulated over time progressively
# the trade-off here is not having to wait hours for the full data migration
# before you can start and run AWX again (including new playbook runs)
for tblname in (
'main_jobevent', 'main_inventoryupdateevent',
'main_projectupdateevent', 'main_adhoccommandevent',
'main_systemjobevent'
):
with connection.cursor() as cursor:
# rename the current event table
cursor.execute(
f'ALTER TABLE {tblname} RENAME TO _old_{tblname};'
)
# create a *new* table with the same schema
cursor.execute(
f'CREATE TABLE {tblname} (LIKE _old_{tblname} INCLUDING ALL);'
)
# alter the *new* table so that the primary key is a big int
cursor.execute(
f'ALTER TABLE {tblname} ALTER COLUMN id TYPE bigint USING id::bigint;'
)

# recreate counter for the new table's primary key to
# start where the *old* table left off (we have to do this because the
# counter changed from an int to a bigint)
cursor.execute(f'DROP SEQUENCE IF EXISTS "{tblname}_id_seq" CASCADE;')
cursor.execute(f'CREATE SEQUENCE "{tblname}_id_seq";')
cursor.execute(
f'ALTER TABLE "{tblname}" ALTER COLUMN "id" '
f"SET DEFAULT nextval('{tblname}_id_seq');"
)
cursor.execute(
f"SELECT setval('{tblname}_id_seq', (SELECT MAX(id) FROM _old_{tblname}), true);"
)

# replace the BTREE index on main_jobevent.job_id with
# a BRIN index to drastically improve per-UJ lookup performance
# see: https://info.crunchydata.com/blog/postgresql-brin-indexes-big-data-performance-with-minimal-storage
if tblname == 'main_jobevent':
cursor.execute("SELECT indexname FROM pg_indexes WHERE tablename='main_jobevent' AND indexdef LIKE '%USING btree (job_id)';")
old_index = cursor.fetchone()[0]
cursor.execute(f'DROP INDEX {old_index}')
cursor.execute('CREATE INDEX main_jobevent_job_id_brin_idx ON main_jobevent USING brin (job_id);')

# remove all of the indexes and constraints from the old table
# (they just slow down the data migration)
cursor.execute(f"SELECT indexname, indexdef FROM pg_indexes WHERE tablename='_old_{tblname}' AND indexname != '{tblname}_pkey';")
indexes = cursor.fetchall()

cursor.execute(f"SELECT conname, contype, pg_catalog.pg_get_constraintdef(r.oid, true) as condef FROM pg_catalog.pg_constraint r WHERE r.conrelid = '_old_{tblname}'::regclass AND conname != '{tblname}_pkey';")
constraints = cursor.fetchall()

for indexname, indexdef in indexes:
cursor.execute(f'DROP INDEX IF EXISTS {indexname}')
for conname, contype, condef in constraints:
cursor.execute(f'ALTER TABLE _old_{tblname} DROP CONSTRAINT IF EXISTS {conname}')


class FakeAlterField(migrations.AlterField):

def database_forwards(self, *args):
# this is intentionally left blank, because we're
# going to accomplish the migration with some custom raw SQL
pass


class Migration(migrations.Migration):

dependencies = [
('main', '0112_v370_workflow_node_identifier'),
]

operations = [
migrations.RunPython(migrate_event_data),
FakeAlterField(
model_name='adhoccommandevent',
name='id',
field=models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
FakeAlterField(
model_name='inventoryupdateevent',
name='id',
field=models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
FakeAlterField(
model_name='jobevent',
name='id',
field=models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
FakeAlterField(
model_name='projectupdateevent',
name='id',
field=models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
FakeAlterField(
model_name='systemjobevent',
name='id',
field=models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
]
22 changes: 22 additions & 0 deletions awx/main/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# Django
from django.conf import settings # noqa
from django.db import connection, ProgrammingError
from django.db.models.signals import pre_delete # noqa

# AWX
Expand Down Expand Up @@ -79,6 +80,27 @@
User.add_to_class('accessible_objects', user_accessible_objects)


def enforce_bigint_pk_migration():
# see: https://github.com/ansible/awx/issues/6010
# look at all the event tables and verify that they have been fully migrated
# from the *old* int primary key table to the replacement bigint table
# if not, attempt to migrate them in the background
for tblname in (
'main_jobevent', 'main_inventoryupdateevent',
'main_projectupdateevent', 'main_adhoccommandevent',
'main_systemjobevent'
):
with connection.cursor() as cursor:
try:
cursor.execute(f'SELECT MAX(id) FROM _old_{tblname}')
if cursor.fetchone():
from awx.main.tasks import migrate_legacy_event_data
migrate_legacy_event_data.apply_async([tblname])
except ProgrammingError:
# the table is gone (migration is unnecessary)
pass


def cleanup_created_modified_by(sender, **kwargs):
# work around a bug in django-polymorphic that doesn't properly
# handle cascades for reverse foreign keys on the polymorphic base model
Expand Down
5 changes: 5 additions & 0 deletions awx/main/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ class Meta:
('job', 'parent_uuid'),
]

id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')
job = models.ForeignKey(
'Job',
related_name='job_events',
Expand Down Expand Up @@ -526,6 +527,7 @@ class Meta:
('project_update', 'end_line'),
]

id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')
project_update = models.ForeignKey(
'ProjectUpdate',
related_name='project_update_events',
Expand Down Expand Up @@ -677,6 +679,7 @@ class Meta:
FAILED_EVENTS = [x[0] for x in EVENT_TYPES if x[2]]
EVENT_CHOICES = [(x[0], x[1]) for x in EVENT_TYPES]

id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')
event = models.CharField(
max_length=100,
choices=EVENT_CHOICES,
Expand Down Expand Up @@ -739,6 +742,7 @@ class Meta:
('inventory_update', 'end_line'),
]

id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')
inventory_update = models.ForeignKey(
'InventoryUpdate',
related_name='inventory_update_events',
Expand Down Expand Up @@ -772,6 +776,7 @@ class Meta:
('system_job', 'end_line'),
]

id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')
system_job = models.ForeignKey(
'SystemJob',
related_name='system_job_events',
Expand Down
52 changes: 50 additions & 2 deletions awx/main/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

# Django
from django.conf import settings
from django.db import transaction, DatabaseError, IntegrityError
from django.db import transaction, DatabaseError, IntegrityError, ProgrammingError, connection
from django.db.models.fields.related import ForeignKey
from django.utils.timezone import now, timedelta
from django.utils.encoding import smart_str
Expand Down Expand Up @@ -59,7 +59,7 @@
Inventory, InventorySource, SmartInventoryMembership,
Job, AdHocCommand, ProjectUpdate, InventoryUpdate, SystemJob,
JobEvent, ProjectUpdateEvent, InventoryUpdateEvent, AdHocCommandEvent, SystemJobEvent,
build_safe_env
build_safe_env, enforce_bigint_pk_migration
)
from awx.main.constants import ACTIVE_STATES
from awx.main.exceptions import AwxTaskError
Expand Down Expand Up @@ -135,6 +135,12 @@ def dispatch_startup():
if Instance.objects.me().is_controller():
awx_isolated_heartbeat()

# at process startup, detect the need to migrate old event records from int
# to bigint; at *some point* in the future, once certain versions of AWX
# and Tower fall out of use/support, we can probably just _assume_ that
# everybody has moved to bigint, and remove this code entirely
enforce_bigint_pk_migration()


def inform_cluster_of_shutdown():
try:
Expand Down Expand Up @@ -679,6 +685,48 @@ def update_host_smart_inventory_memberships():
smart_inventory.update_computed_fields()


@task(queue=get_local_queuename)
def migrate_legacy_event_data(tblname):
if 'event' not in tblname:
return
with advisory_lock(f'bigint_migration_{tblname}', wait=False) as acquired:
if acquired is False:
return
chunk = settings.JOB_EVENT_MIGRATION_CHUNK_SIZE

def _remaining():
try:
cursor.execute(f'SELECT MAX(id) FROM _old_{tblname};')
return cursor.fetchone()[0]
except ProgrammingError:
# the table is gone (migration is unnecessary)
return None

with connection.cursor() as cursor:
total_rows = _remaining()
while total_rows:
with transaction.atomic():
cursor.execute(
f'INSERT INTO {tblname} SELECT * FROM _old_{tblname} ORDER BY id DESC LIMIT {chunk} RETURNING id;'
)
last_insert_pk = cursor.fetchone()
if last_insert_pk is None:
# this means that the SELECT from the old table was
# empty, and there was nothing to insert (so we're done)
break
last_insert_pk = last_insert_pk[0]
cursor.execute(
f'DELETE FROM _old_{tblname} WHERE id IN (SELECT id FROM _old_{tblname} ORDER BY id DESC LIMIT {chunk});'
)
logger.warn(
f'migrated int -> bigint rows to {tblname} from _old_{tblname}; # ({last_insert_pk} rows remaining)'
)

if _remaining() is None:
cursor.execute(f'DROP TABLE IF EXISTS _old_{tblname}')
logger.warn(f'{tblname} primary key migration to bigint has finished')


@task(queue=get_local_queuename)
def delete_inventory(inventory_id, user_id, retries=5):
# Delete inventory as user
Expand Down
3 changes: 3 additions & 0 deletions awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ def IS_TESTING(argv=None):
# The maximum size of the job event worker queue before requests are blocked
JOB_EVENT_MAX_QUEUE_SIZE = 10000

# The number of job events to migrate per-transaction when moving from int -> bigint
JOB_EVENT_MIGRATION_CHUNK_SIZE = 1000000

# Disallow sending session cookies over insecure connections
SESSION_COOKIE_SECURE = True

Expand Down

0 comments on commit 155a1d9

Please sign in to comment.