From 92857fad89052dc7f1a4f17acc305862813c5fde Mon Sep 17 00:00:00 2001 From: Hielke Date: Wed, 28 Sep 2022 15:34:16 +0200 Subject: [PATCH] Separated aggregation removal --- ...ate.py => complete_aggregate_cmsa15min.py} | 20 +-------- .../commands/remove_aggregate_cmsa15min.py | 29 +++++++++++++ tests/test_continuousaggregate.py | 42 +++++++++++++++---- tests/test_peoplemeasurement.py | 2 +- 4 files changed, 66 insertions(+), 27 deletions(-) rename src/continuousaggregate/management/commands/{complete_aggregate.py => complete_aggregate_cmsa15min.py} (69%) create mode 100644 src/continuousaggregate/management/commands/remove_aggregate_cmsa15min.py diff --git a/src/continuousaggregate/management/commands/complete_aggregate.py b/src/continuousaggregate/management/commands/complete_aggregate_cmsa15min.py similarity index 69% rename from src/continuousaggregate/management/commands/complete_aggregate.py rename to src/continuousaggregate/management/commands/complete_aggregate_cmsa15min.py index d162cdf..ce23b0f 100644 --- a/src/continuousaggregate/management/commands/complete_aggregate.py +++ b/src/continuousaggregate/management/commands/complete_aggregate_cmsa15min.py @@ -11,29 +11,11 @@ @transaction.atomic class Command(BaseCommand): - def add_arguments(self, parser): - parser.add_argument('table_name', type=str) parser.add_argument('--recalculate-history', default=False) parser.add_argument('--recalculate-history-since', type=date.fromisoformat) def handle(self, *args, **options): - table_name = options['table_name'] - recalculate_history = options['recalculate_history'] - recalculate_history_since = options['recalculate_history_since'] - - if recalculate_history: - self.stdout.write(f"Start deleting full aggregation table {Cmsa15Min._meta.db_table}") - Cmsa15Min.objects.all().delete() - self.stdout.write(f"Finished deleting full aggregation table {Cmsa15Min._meta.db_table}") - - if recalculate_history_since: - self.stdout.write( - f"Start deleting aggregation table {Cmsa15Min._meta.db_table} since {recalculate_history_since}") - Cmsa15Min.objects.filter(timestamp_rounded__gt=recalculate_history_since).delete() - self.stdout.write( - f"Finished deleting aggregation table {Cmsa15Min._meta.db_table} since {recalculate_history_since}") - # calculate run_id based on the latest run_id from execution_log table with connection.cursor() as cursor: self.stdout.write(f"Start calculate next value for run_id based on execution_log table ") @@ -53,7 +35,7 @@ def handle(self, *args, **options): source_table := 'vw_cmsa_15min_v01_aggregate', process_schema := 'prc', target_schema := 'public', - target_table := '{table_name}', + target_table := '{Cmsa15Min._meta.db_table}', process_type := 'IU', implicit_deletes := false, run_id := {run_id}, diff --git a/src/continuousaggregate/management/commands/remove_aggregate_cmsa15min.py b/src/continuousaggregate/management/commands/remove_aggregate_cmsa15min.py new file mode 100644 index 0000000..f7f3d69 --- /dev/null +++ b/src/continuousaggregate/management/commands/remove_aggregate_cmsa15min.py @@ -0,0 +1,29 @@ +import logging +from datetime import date + +from django.core.management.base import BaseCommand +from django.db import connection, transaction + +from continuousaggregate.models import Cmsa15Min + +log = logging.getLogger(__name__) + + +@transaction.atomic +class Command(BaseCommand): + def add_arguments(self, parser): + parser.add_argument('--since', type=date.fromisoformat) + + def handle(self, *args, **options): + since = options['since'] + + since_log = '' + if since: + q = Cmsa15Min.objects.filter(timestamp_rounded__gt=since) + since_log = since + else: + q = Cmsa15Min.objects.all() + + self.stdout.write(f"Start deleting records in aggregation table {Cmsa15Min._meta.db_table} since {since_log}") + q.delete() + self.stdout.write(f"Finished deleting records in aggregation table {Cmsa15Min._meta.db_table} since {since_log}") diff --git a/tests/test_continuousaggregate.py b/tests/test_continuousaggregate.py index 63d4786..32f4630 100644 --- a/tests/test_continuousaggregate.py +++ b/tests/test_continuousaggregate.py @@ -59,7 +59,7 @@ def test_vanilla(self, client): assert Observation.objects.all().count() > 100 # Run the aggregator - call_man_command('complete_aggregate', 'continuousaggregate_cmsa15min') + call_man_command('complete_aggregate_cmsa15min') # Do we have any records in the continuous aggregate table? assert Cmsa15Min.objects.all().count() > 500 @@ -73,18 +73,36 @@ def test_vanilla(self, client): .first() assert middle_record.basedonxmessages == 3 - def test_aggregate_recalculation(self, client): + def test_aggregate_full_recalculation(self, client): today = self.add_test_records(client, test_days=3) # Run the aggregator - call_man_command('complete_aggregate', 'continuousaggregate_cmsa15min') + call_man_command('complete_aggregate_cmsa15min') - # Remove underlying source data - Observation.objects.all().delete() + # Remove the aggregate + call_man_command('remove_aggregate_cmsa15min') - # Recalculate the history since yesterday + assert Cmsa15Min.objects.all().count() == 0 + + # Run the aggregator again + call_man_command('complete_aggregate_cmsa15min') + + # Check the last aggregation record again + last_record = Cmsa15Min.objects \ + .filter(sensor=self.sensor_names[0]) \ + .order_by('-timestamp_rounded') \ + .first() + assert today == last_record.timestamp_rounded.date() + + def test_aggregate_recalculation_since_date(self, client): + today = self.add_test_records(client, test_days=3) + + # Run the aggregator + call_man_command('complete_aggregate_cmsa15min') + + # Remove the aggregate since yesterday a_day_ago = (date.today() - timedelta(days=1)).isoformat() - call_man_command('complete_aggregate', 'continuousaggregate_cmsa15min', f'--recalculate-history-since={a_day_ago}') + call_man_command('remove_aggregate_cmsa15min', f'--since={a_day_ago}') # We removed source records from yesterday onwards. That means we now expect the # latest date in the continuous aggregate to be the day before yesterday @@ -93,3 +111,13 @@ def test_aggregate_recalculation(self, client): .order_by('-timestamp_rounded') \ .first() assert today - timedelta(days=2) == last_record.timestamp_rounded.date() + + # Run the aggregator again + call_man_command('complete_aggregate_cmsa15min') + + # Check the last aggregation record again + last_record = Cmsa15Min.objects \ + .filter(sensor=self.sensor_names[0]) \ + .order_by('-timestamp_rounded') \ + .first() + assert today == last_record.timestamp_rounded.date() diff --git a/tests/test_peoplemeasurement.py b/tests/test_peoplemeasurement.py index 304d02e..a37b8db 100644 --- a/tests/test_peoplemeasurement.py +++ b/tests/test_peoplemeasurement.py @@ -100,7 +100,7 @@ def test_get_15min_aggregation_timezone_with_both_v1_and_v2_records(self): parser.consume(end_at_empty_queue=True) # Complete aggregate because the query in the endpoint depends on it - call_man_command('complete_aggregate', 'continuousaggregate_cmsa15min') + call_man_command('complete_aggregate_cmsa15min') # test whether the endpoint responds correctly response = self.client.get(self.URL, **GET_AUTHORIZATION_HEADER)