Skip to content

Commit

Permalink
Reprocess tables with empty samples sooner.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marina Samuel authored and Allen Short committed Jun 27, 2019
1 parent 156ecb0 commit 4d800d8
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 16 deletions.
7 changes: 5 additions & 2 deletions redash/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,11 @@ def email_server_is_configured():
# Frequency of clearing out old schema metadata.
SCHEMA_METADATA_TTL_DAYS = int(os.environ.get("REDASH_SCHEMA_METADATA_TTL_DAYS", 60))

# Frequency of schema samples refresh
SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS = int(os.environ.get("REDASH_SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS", 14))
# Frequency of schema samples updates
SCHEMA_SAMPLE_UPDATE_FREQUENCY_DAYS = int(os.environ.get("REDASH_SCHEMA_SAMPLE_UPDATE_FREQUENCY_DAYS", 14))

# Frequency of schema samples refresh when no samples are stored
SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS = int(os.environ.get("REDASH_SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS", 2))

# kylin
KYLIN_OFFSET = int(os.environ.get('REDASH_KYLIN_OFFSET', 0))
Expand Down
35 changes: 26 additions & 9 deletions redash/tasks/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from dateutil import parser
from six import text_type
from sqlalchemy.orm import load_only
from sqlalchemy import or_
Expand Down Expand Up @@ -256,14 +257,35 @@ def truncate_long_string(original_str, max_length):


@celery.task(name="redash.tasks.update_sample")
def update_sample(data_source_id, table_name, table_id):
def update_sample(data_source_id, table_name, table_id, sample_updated_at):
"""
For a given table, find look up a sample row for it and update
For a given table, look up a sample row for it and update
the "example" fields for it in the column_metadata table.
"""
logger.info(u"task=update_sample state=start table_name=%s", table_name)
start_time = time.time()
ds = models.DataSource.get_by_id(data_source_id)

persisted_columns = ColumnMetadata.query.filter(
ColumnMetadata.exists.is_(True),
ColumnMetadata.table_id == table_id,
).options(load_only('id', 'name', 'example'))

DAYS_AGO = (
utils.utcnow() - datetime.timedelta(days=settings.SCHEMA_SAMPLE_UPDATE_FREQUENCY_DAYS))

first_column = persisted_columns.first()

if (first_column and
sample_updated_at and
first_column.example and
parser.parse(sample_updated_at) > DAYS_AGO):
# Look at the first example in the persisted columns.
# If this is *not* empty AND sample_updated_at is recent, don't update sample
logger.info(u"task=update_sample state=abort - recent sample exists table_name=%s",
table_name)
return

sample = None
try:
sample = ds.query_runner.get_table_sample(table_name)
Expand All @@ -273,14 +295,9 @@ def update_sample(data_source_id, table_name, table_id):
if not sample:
return

persisted_columns = ColumnMetadata.query.filter(
ColumnMetadata.exists.is_(True),
ColumnMetadata.table_id == table_id,
).options(load_only('id')).all()

# If a column exists, add a sample to it.
column_examples = []
for persisted_column in persisted_columns:
for persisted_column in persisted_columns.all():
column_example = sample.get(persisted_column.name, None)
column_example = column_example if isinstance(
column_example, unicode) else str(column_example)
Expand Down Expand Up @@ -330,7 +347,7 @@ def refresh_samples(data_source_id, table_sample_limit):
for table in tables_to_sample:
tasks.append(
update_sample.signature(
args=(ds.id, table.name, table.id),
args=(ds.id, table.name, table.id, table.sample_updated_at),
queue=settings.SCHEMAS_REFRESH_QUEUE
)
)
Expand Down
60 changes: 55 additions & 5 deletions tests/tasks/test_refresh_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ def setUp(self):
self.patched_get_schema.return_value = self.default_schema_return_value

get_table_sample_patcher = patch('redash.query_runner.BaseQueryRunner.get_table_sample')
patched_get_table_sample = get_table_sample_patcher.start()
self.patched_get_table_sample = get_table_sample_patcher.start()
self.addCleanup(get_table_sample_patcher.stop)
patched_get_table_sample.return_value = {self.COLUMN_NAME: self.COLUMN_EXAMPLE}
self.patched_get_table_sample.return_value = {self.COLUMN_NAME: self.COLUMN_EXAMPLE}

def tearDown(self):
self.factory.data_source.query_runner.configuration['samples'] = False
Expand Down Expand Up @@ -87,7 +87,8 @@ def test_refresh_schema_creates_tables(self):
update_sample(
self.factory.data_source.id,
'table',
1
1,
"2019-05-09T17:07:52.386910Z"
)
table_metadata = TableMetadata.query.all()
column_metadata = ColumnMetadata.query.all()
Expand Down Expand Up @@ -182,7 +183,8 @@ def test_refresh_schema_update_column(self):
update_sample(
self.factory.data_source.id,
'table',
1
1,
"2019-05-09T17:07:52.386910Z"
)
column_metadata = ColumnMetadata.query.all()
self.assertEqual(column_metadata[0].to_dict(), self.EXPECTED_COLUMN_METADATA)
Expand Down Expand Up @@ -292,7 +294,8 @@ def test_refresh_schema_doesnt_overwrite_samples(self):
update_sample(
self.factory.data_source.id,
'table',
1
1,
"2019-05-09T17:07:52.386910Z"
)
column_metadata = ColumnMetadata.query.first()
self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE)
Expand All @@ -317,3 +320,50 @@ def test_refresh_samples_applied_to_one_data_source(self):
TableMetadata.sample_updated_at.isnot(None)
)
self.assertEqual(table_metadata.count(), len(self.default_schema_return_value))

def test_recent_empty_sample_refreshs(self):
self.factory.data_source.query_runner.configuration['samples'] = True
refresh_schema(self.factory.data_source.id)

# Confirm no sample exists
column_metadata = ColumnMetadata.query.first()
self.assertEqual(column_metadata.example, None)

LAST_UPDATE = utils.utcnow() - datetime.timedelta(days=5)
update_sample(
self.factory.data_source.id,
'table',
1,
LAST_UPDATE.isoformat()
)

column_metadata = ColumnMetadata.query.first()
self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE)

def test_recent_non_empty_sample_doesnt_refresh(self):
self.factory.data_source.query_runner.configuration['samples'] = True
refresh_schema(self.factory.data_source.id)

update_sample(
self.factory.data_source.id,
'table',
1,
None
)

# Confirm a sample was added
column_metadata = ColumnMetadata.query.first()
self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE)

self.patched_get_table_sample.return_value = {self.COLUMN_NAME: "a new example"}
LAST_UPDATE = utils.utcnow() - datetime.timedelta(days=5)
update_sample(
self.factory.data_source.id,
'table',
1,
LAST_UPDATE.isoformat()
)

# The sample doesn't take on the new value that is returned.
column_metadata = ColumnMetadata.query.first()
self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE)

0 comments on commit 4d800d8

Please sign in to comment.