From 96a9991ab142774593d416b770d514137ba91b0c Mon Sep 17 00:00:00 2001 From: Marina Samuel Date: Fri, 10 May 2019 12:32:19 -0400 Subject: [PATCH] Reprocess tables with empty samples sooner. --- redash/settings/__init__.py | 7 +++- redash/tasks/queries.py | 35 ++++++++++++----- tests/tasks/test_refresh_schemas.py | 60 ++++++++++++++++++++++++++--- 3 files changed, 86 insertions(+), 16 deletions(-) diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py index e51aba1543..e38746882a 100644 --- a/redash/settings/__init__.py +++ b/redash/settings/__init__.py @@ -334,8 +334,11 @@ def email_server_is_configured(): # Frequency of clearing out old schema metadata. SCHEMA_METADATA_TTL_DAYS = int(os.environ.get("REDASH_SCHEMA_METADATA_TTL_DAYS", 60)) -# Frequency of schema samples refresh -SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS = int(os.environ.get("REDASH_SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS", 14)) +# Frequency of schema samples updates +SCHEMA_SAMPLE_UPDATE_FREQUENCY_DAYS = int(os.environ.get("REDASH_SCHEMA_SAMPLE_UPDATE_FREQUENCY_DAYS", 14)) + +# Frequency of schema samples refresh when no samples are stored +SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS = int(os.environ.get("REDASH_SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS", 2)) # kylin KYLIN_OFFSET = int(os.environ.get('REDASH_KYLIN_OFFSET', 0)) diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py index bd4a688b25..76f7b47d98 100644 --- a/redash/tasks/queries.py +++ b/redash/tasks/queries.py @@ -8,6 +8,7 @@ from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded from celery.result import AsyncResult from celery.utils.log import get_task_logger +from dateutil import parser from six import text_type from sqlalchemy.orm import load_only from sqlalchemy import or_ @@ -244,14 +245,35 @@ def truncate_long_string(original_str, max_length): @celery.task(name="redash.tasks.update_sample") -def update_sample(data_source_id, table_name, table_id): +def update_sample(data_source_id, table_name, table_id, sample_updated_at): """ - For a given table, find look up a sample row for it and update + For a given table, look up a sample row for it and update the "example" fields for it in the column_metadata table. """ logger.info(u"task=update_sample state=start table_name=%s", table_name) start_time = time.time() ds = models.DataSource.get_by_id(data_source_id) + + persisted_columns = ColumnMetadata.query.filter( + ColumnMetadata.exists.is_(True), + ColumnMetadata.table_id == table_id, + ).options(load_only('id', 'name', 'example')) + + DAYS_AGO = ( + utils.utcnow() - datetime.timedelta(days=settings.SCHEMA_SAMPLE_UPDATE_FREQUENCY_DAYS)) + + first_column = persisted_columns.first() + + if (first_column and + sample_updated_at and + first_column.example and + parser.parse(sample_updated_at) > DAYS_AGO): + # Look at the first example in the persisted columns. + # If this is *not* empty AND sample_updated_at is recent, don't update sample + logger.info(u"task=update_sample state=abort - recent sample exists table_name=%s", + table_name) + return + sample = None try: sample = ds.query_runner.get_table_sample(table_name) @@ -261,14 +283,9 @@ def update_sample(data_source_id, table_name, table_id): if not sample: return - persisted_columns = ColumnMetadata.query.filter( - ColumnMetadata.exists.is_(True), - ColumnMetadata.table_id == table_id, - ).options(load_only('id')).all() - # If a column exists, add a sample to it. column_examples = [] - for persisted_column in persisted_columns: + for persisted_column in persisted_columns.all(): column_example = sample.get(persisted_column.name, None) column_example = column_example if isinstance( column_example, unicode) else str(column_example) @@ -318,7 +335,7 @@ def refresh_samples(data_source_id, table_sample_limit): for table in tables_to_sample: tasks.append( update_sample.signature( - args=(ds.id, table.name, table.id), + args=(ds.id, table.name, table.id, table.sample_updated_at), queue=settings.SCHEMAS_REFRESH_QUEUE ) ) diff --git a/tests/tasks/test_refresh_schemas.py b/tests/tasks/test_refresh_schemas.py index 1bcb3b14b2..419f55676b 100644 --- a/tests/tasks/test_refresh_schemas.py +++ b/tests/tasks/test_refresh_schemas.py @@ -42,9 +42,9 @@ def setUp(self): self.patched_get_schema.return_value = self.default_schema_return_value get_table_sample_patcher = patch('redash.query_runner.BaseQueryRunner.get_table_sample') - patched_get_table_sample = get_table_sample_patcher.start() + self.patched_get_table_sample = get_table_sample_patcher.start() self.addCleanup(get_table_sample_patcher.stop) - patched_get_table_sample.return_value = {self.COLUMN_NAME: self.COLUMN_EXAMPLE} + self.patched_get_table_sample.return_value = {self.COLUMN_NAME: self.COLUMN_EXAMPLE} def tearDown(self): self.factory.data_source.query_runner.configuration['samples'] = False @@ -87,7 +87,8 @@ def test_refresh_schema_creates_tables(self): update_sample( self.factory.data_source.id, 'table', - 1 + 1, + "2019-05-09T17:07:52.386910Z" ) table_metadata = TableMetadata.query.all() column_metadata = ColumnMetadata.query.all() @@ -182,7 +183,8 @@ def test_refresh_schema_update_column(self): update_sample( self.factory.data_source.id, 'table', - 1 + 1, + "2019-05-09T17:07:52.386910Z" ) column_metadata = ColumnMetadata.query.all() self.assertEqual(column_metadata[0].to_dict(), self.EXPECTED_COLUMN_METADATA) @@ -292,7 +294,8 @@ def test_refresh_schema_doesnt_overwrite_samples(self): update_sample( self.factory.data_source.id, 'table', - 1 + 1, + "2019-05-09T17:07:52.386910Z" ) column_metadata = ColumnMetadata.query.first() self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE) @@ -317,3 +320,50 @@ def test_refresh_samples_applied_to_one_data_source(self): TableMetadata.sample_updated_at.isnot(None) ) self.assertEqual(table_metadata.count(), len(self.default_schema_return_value)) + + def test_recent_empty_sample_refreshs(self): + self.factory.data_source.query_runner.configuration['samples'] = True + refresh_schema(self.factory.data_source.id) + + # Confirm no sample exists + column_metadata = ColumnMetadata.query.first() + self.assertEqual(column_metadata.example, None) + + LAST_UPDATE = utils.utcnow() - datetime.timedelta(days=5) + update_sample( + self.factory.data_source.id, + 'table', + 1, + LAST_UPDATE.isoformat() + ) + + column_metadata = ColumnMetadata.query.first() + self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE) + + def test_recent_non_empty_sample_doesnt_refresh(self): + self.factory.data_source.query_runner.configuration['samples'] = True + refresh_schema(self.factory.data_source.id) + + update_sample( + self.factory.data_source.id, + 'table', + 1, + None + ) + + # Confirm a sample was added + column_metadata = ColumnMetadata.query.first() + self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE) + + self.patched_get_table_sample.return_value = {self.COLUMN_NAME: "a new example"} + LAST_UPDATE = utils.utcnow() - datetime.timedelta(days=5) + update_sample( + self.factory.data_source.id, + 'table', + 1, + LAST_UPDATE.isoformat() + ) + + # The sample doesn't take on the new value that is returned. + column_metadata = ColumnMetadata.query.first() + self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE)