Skip to content
This repository has been archived by the owner on Sep 10, 2024. It is now read-only.

Commit

Permalink
Separated aggregation removal
Browse files Browse the repository at this point in the history
  • Loading branch information
Hielke committed Sep 28, 2022
1 parent f2cd7fb commit 92857fa
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,11 @@

@transaction.atomic
class Command(BaseCommand):

def add_arguments(self, parser):
parser.add_argument('table_name', type=str)
parser.add_argument('--recalculate-history', default=False)
parser.add_argument('--recalculate-history-since', type=date.fromisoformat)

def handle(self, *args, **options):
table_name = options['table_name']
recalculate_history = options['recalculate_history']
recalculate_history_since = options['recalculate_history_since']

if recalculate_history:
self.stdout.write(f"Start deleting full aggregation table {Cmsa15Min._meta.db_table}")
Cmsa15Min.objects.all().delete()
self.stdout.write(f"Finished deleting full aggregation table {Cmsa15Min._meta.db_table}")

if recalculate_history_since:
self.stdout.write(
f"Start deleting aggregation table {Cmsa15Min._meta.db_table} since {recalculate_history_since}")
Cmsa15Min.objects.filter(timestamp_rounded__gt=recalculate_history_since).delete()
self.stdout.write(
f"Finished deleting aggregation table {Cmsa15Min._meta.db_table} since {recalculate_history_since}")

# calculate run_id based on the latest run_id from execution_log table
with connection.cursor() as cursor:
self.stdout.write(f"Start calculate next value for run_id based on execution_log table ")
Expand All @@ -53,7 +35,7 @@ def handle(self, *args, **options):
source_table := 'vw_cmsa_15min_v01_aggregate',
process_schema := 'prc',
target_schema := 'public',
target_table := '{table_name}',
target_table := '{Cmsa15Min._meta.db_table}',
process_type := 'IU',
implicit_deletes := false,
run_id := {run_id},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging
from datetime import date

from django.core.management.base import BaseCommand
from django.db import connection, transaction

from continuousaggregate.models import Cmsa15Min

log = logging.getLogger(__name__)


@transaction.atomic
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--since', type=date.fromisoformat)

def handle(self, *args, **options):
since = options['since']

since_log = ''
if since:
q = Cmsa15Min.objects.filter(timestamp_rounded__gt=since)
since_log = since
else:
q = Cmsa15Min.objects.all()

self.stdout.write(f"Start deleting records in aggregation table {Cmsa15Min._meta.db_table} since {since_log}")
q.delete()
self.stdout.write(f"Finished deleting records in aggregation table {Cmsa15Min._meta.db_table} since {since_log}")
42 changes: 35 additions & 7 deletions tests/test_continuousaggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_vanilla(self, client):
assert Observation.objects.all().count() > 100

# Run the aggregator
call_man_command('complete_aggregate', 'continuousaggregate_cmsa15min')
call_man_command('complete_aggregate_cmsa15min')

# Do we have any records in the continuous aggregate table?
assert Cmsa15Min.objects.all().count() > 500
Expand All @@ -73,18 +73,36 @@ def test_vanilla(self, client):
.first()
assert middle_record.basedonxmessages == 3

def test_aggregate_recalculation(self, client):
def test_aggregate_full_recalculation(self, client):
today = self.add_test_records(client, test_days=3)

# Run the aggregator
call_man_command('complete_aggregate', 'continuousaggregate_cmsa15min')
call_man_command('complete_aggregate_cmsa15min')

# Remove underlying source data
Observation.objects.all().delete()
# Remove the aggregate
call_man_command('remove_aggregate_cmsa15min')

# Recalculate the history since yesterday
assert Cmsa15Min.objects.all().count() == 0

# Run the aggregator again
call_man_command('complete_aggregate_cmsa15min')

# Check the last aggregation record again
last_record = Cmsa15Min.objects \
.filter(sensor=self.sensor_names[0]) \
.order_by('-timestamp_rounded') \
.first()
assert today == last_record.timestamp_rounded.date()

def test_aggregate_recalculation_since_date(self, client):
today = self.add_test_records(client, test_days=3)

# Run the aggregator
call_man_command('complete_aggregate_cmsa15min')

# Remove the aggregate since yesterday
a_day_ago = (date.today() - timedelta(days=1)).isoformat()
call_man_command('complete_aggregate', 'continuousaggregate_cmsa15min', f'--recalculate-history-since={a_day_ago}')
call_man_command('remove_aggregate_cmsa15min', f'--since={a_day_ago}')

# We removed source records from yesterday onwards. That means we now expect the
# latest date in the continuous aggregate to be the day before yesterday
Expand All @@ -93,3 +111,13 @@ def test_aggregate_recalculation(self, client):
.order_by('-timestamp_rounded') \
.first()
assert today - timedelta(days=2) == last_record.timestamp_rounded.date()

# Run the aggregator again
call_man_command('complete_aggregate_cmsa15min')

# Check the last aggregation record again
last_record = Cmsa15Min.objects \
.filter(sensor=self.sensor_names[0]) \
.order_by('-timestamp_rounded') \
.first()
assert today == last_record.timestamp_rounded.date()
2 changes: 1 addition & 1 deletion tests/test_peoplemeasurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def test_get_15min_aggregation_timezone_with_both_v1_and_v2_records(self):
parser.consume(end_at_empty_queue=True)

# Complete aggregate because the query in the endpoint depends on it
call_man_command('complete_aggregate', 'continuousaggregate_cmsa15min')
call_man_command('complete_aggregate_cmsa15min')

# test whether the endpoint responds correctly
response = self.client.get(self.URL, **GET_AUTHORIZATION_HEADER)
Expand Down

0 comments on commit 92857fa

Please sign in to comment.