From 26a9e512d3e30a3dbffc65ff4965aa68bbf29988 Mon Sep 17 00:00:00 2001 From: Marina Samuel Date: Fri, 8 Jun 2018 13:23:48 -0400 Subject: [PATCH] [WIP] Closes #415: Make data source health monitoring an extension. --- redash/extensions.py | 14 +++++++- redash/monitor.py | 2 -- redash/query_runner/__init__.py | 6 ++-- redash/settings/__init__.py | 13 +------- redash/settings/helpers.py | 6 ---- redash/tasks/__init__.py | 1 - redash/tasks/health.py | 59 --------------------------------- redash/worker.py | 13 +++++--- 8 files changed, 25 insertions(+), 89 deletions(-) delete mode 100644 redash/tasks/health.py diff --git a/redash/extensions.py b/redash/extensions.py index 78125842d3..d4906ee62c 100644 --- a/redash/extensions.py +++ b/redash/extensions.py @@ -8,7 +8,19 @@ def init_extensions(app): if not hasattr(app, 'redash_extensions'): app.redash_extensions = {} + if not hasattr(app, 'task_extensions'): + app.task_extensions = {} + for entry_point in iter_entry_points('redash.extensions'): app.logger.info('Loading Redash extension %s.', entry_point.name) extension = entry_point.load() - app.redash_extensions[entry_point.name] = extension(app) + extension_response = extension(app) + extension_repository = app.redash_extensions + + # This is an interval task + if (type(extension_response) == dict and + "task" in extension_response and + "interval_in_seconds" in extension_response): + extension_repository = app.task_extensions + + extension_repository[entry_point.name] = extension_response diff --git a/redash/monitor.py b/redash/monitor.py index 7663f717e3..34a2903fdc 100644 --- a/redash/monitor.py +++ b/redash/monitor.py @@ -1,4 +1,3 @@ -import json from redash import redis_connection, models, __version__, settings @@ -18,7 +17,6 @@ def get_status(): status['workers'] = [] status['manager'] = redis_connection.hgetall('redash:status') - status['data_sources'] = json.loads(redis_connection.get('data_sources:health') or '{}') queues = {} for ds in models.DataSource.query: diff --git a/redash/query_runner/__init__.py b/redash/query_runner/__init__.py index e88ec28ddf..2851587f78 100644 --- a/redash/query_runner/__init__.py +++ b/redash/query_runner/__init__.py @@ -102,12 +102,10 @@ def get_data_source_version(self): return version - def test_connection(self, custom_query_text=None): + def test_connection(self): if self.noop_query is None: raise NotImplementedError() - - query_text = custom_query_text or self.noop_query - data, error = self.run_query(query_text, None) + data, error = self.run_query(self.noop_query, None) if error is not None: raise Exception(error) diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py index 1e11536b09..535a3904ee 100644 --- a/redash/settings/__init__.py +++ b/redash/settings/__init__.py @@ -1,7 +1,7 @@ import os from funcy import distinct, remove -from .helpers import parse_db_url, fix_assets_path, array_from_string, parse_boolean, int_or_none, set_from_string, dict_from_string +from .helpers import parse_db_url, fix_assets_path, array_from_string, parse_boolean, int_or_none, set_from_string def all_settings(): @@ -241,14 +241,3 @@ def all_settings(): # Allow Parameters in Embeds # WARNING: With this option enabled, Redash reads query parameters from the request URL (risk of SQL injection!) ALLOW_PARAMETERS_IN_EMBEDS = parse_boolean(os.environ.get("REDASH_ALLOW_PARAMETERS_IN_EMBEDS", "false")) - -# Allow for a map of custom queries to test data source performance and availability. -# A sample map may look like: -# { -# "1": "select 1;", -# "5": "select 1;" -# } -CUSTOM_HEALTH_QUERIES = dict_from_string(os.environ.get("REDASH_CUSTOM_HEALTH_QUERIES", "")) - -# Frequency of health query runs in minutes (12 hours by default) -HEALTH_QUERIES_REFRESH_SCHEDULE = int(os.environ.get("REDASH_HEALTH_QUERIES_REFRESH_SCHEDULE", 720)) diff --git a/redash/settings/helpers.py b/redash/settings/helpers.py index cfe69bd38d..e55d61001d 100644 --- a/redash/settings/helpers.py +++ b/redash/settings/helpers.py @@ -33,12 +33,6 @@ def array_from_string(s): return [item.strip() for item in array] -def dict_from_string(s): - try: - return json.loads(s) - except ValueError: - return {} - def set_from_string(s): return set(array_from_string(s)) diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py index 05d51aae50..f242e4c516 100644 --- a/redash/tasks/__init__.py +++ b/redash/tasks/__init__.py @@ -1,4 +1,3 @@ from .general import record_event, version_check, send_mail -from .health import health_status from .queries import QueryTask, refresh_queries, refresh_schemas, cleanup_tasks, cleanup_query_results, execute_query from .alerts import check_alerts_for_query \ No newline at end of file diff --git a/redash/tasks/health.py b/redash/tasks/health.py deleted file mode 100644 index 2502b32ddf..0000000000 --- a/redash/tasks/health.py +++ /dev/null @@ -1,59 +0,0 @@ -import json -import time -from random import randint - -from celery.utils.log import get_task_logger -from redash import models, redis_connection, settings, statsd_client -from redash.worker import celery -from redash.utils import parse_human_time - -logger = get_task_logger(__name__) - - -def update_health_status(data_source_id, data_source_name, query_text, data): - key = "data_sources:health" - - cache = json.loads(redis_connection.get(key) or '{}') - if data_source_id not in cache: - cache[data_source_id] = { - "metadata": { "name": data_source_name }, - "queries": {} - } - cache[data_source_id]["queries"][query_text] = data - - cache[data_source_id]["status"] = "SUCCESS" - for query_status in cache[data_source_id]["queries"].values(): - if query_status["status"] == "FAIL": - cache[data_source_id]["status"] = "FAIL" - break - - redis_connection.set(key, json.dumps(cache)) - -@celery.task(name="redash.tasks.health_status", time_limit=90, soft_time_limit=60) -def health_status(): - for ds in models.DataSource.query: - logger.info(u"task=health_status state=start ds_id=%s", ds.id) - - runtime = None - query_text = ds.query_runner.noop_query - custom_queries = settings.CUSTOM_HEALTH_QUERIES - ds_id = str(ds.id) - - if custom_queries and ds_id in custom_queries: - query_text = custom_queries[ds_id] - - try: - start_time = time.time() - ds.query_runner.test_connection(query_text) - runtime = time.time() - start_time - except Exception as e: - logger.warning(u"Failed health check for the data source: %s", ds.name, exc_info=1) - statsd_client.incr('health_status.error') - logger.info(u"task=health_status state=error ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) - - update_health_status(ds_id, ds.name, query_text, { - "status": "SUCCESS" if runtime is not None else "FAIL", - "last_run": start_time, - "last_run_human": str(parse_human_time(str(start_time))), - "runtime": runtime - }) diff --git a/redash/worker.py b/redash/worker.py index 668da00735..e08b2ae794 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -16,10 +16,6 @@ include='redash.tasks') celery_schedule = { - 'health_status': { - 'task': 'redash.tasks.health_status', - 'schedule': timedelta(minutes=settings.HEALTH_QUERIES_REFRESH_SCHEDULE) - }, 'refresh_queries': { 'task': 'redash.tasks.refresh_queries', 'schedule': timedelta(seconds=30) @@ -82,3 +78,12 @@ def __call__(self, *args, **kwargs): def init_celery_flask_app(**kwargs): app = create_app() app.app_context().push() + +@celery.on_after_configure.connect +def test(sender, **kwargs): + app = create_app() + for task in app.task_extensions.values(): + interval = task["interval_in_seconds"] + task = task["task"] + task.delay() + sender.add_periodic_task(interval, task.s(), name=task.name)