diff --git a/setup.py b/setup.py index 6a3a18cf97834..ad86b08959ae0 100644 --- a/setup.py +++ b/setup.py @@ -61,7 +61,7 @@ def get_git_sha(): 'pandas==0.20.2', 'parsedatetime==2.0.0', 'pydruid==0.3.1', - 'PyHive>=0.3.0', + 'PyHive>=0.4.0', 'python-dateutil==2.6.0', 'requests==2.17.3', 'simplejson==3.10.0', diff --git a/superset/assets/javascripts/SqlLab/components/ResultSet.jsx b/superset/assets/javascripts/SqlLab/components/ResultSet.jsx index c9814ec214adc..f306050696643 100644 --- a/superset/assets/javascripts/SqlLab/components/ResultSet.jsx +++ b/superset/assets/javascripts/SqlLab/components/ResultSet.jsx @@ -155,6 +155,7 @@ export default class ResultSet extends React.PureComponent { } if (['running', 'pending', 'fetching'].indexOf(query.state) > -1) { let progressBar; + let trackingUrl; if (query.progress > 0 && query.state === 'running') { progressBar = ( ); } + if (query.trackingUrl) { + trackingUrl = ( + + ); + } return (
Loading... {progressBar} +
+ {trackingUrl} +
); } else if (query.state === 'failed') { diff --git a/superset/config.py b/superset/config.py index 6c38fa2f76b51..2c27415bf2983 100644 --- a/superset/config.py +++ b/superset/config.py @@ -241,6 +241,7 @@ class CeleryConfig(object): CELERY_IMPORTS = ('superset.sql_lab', ) CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite' CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}} + CELERYD_LOG_LEVEL = 'DEBUG' CELERY_CONFIG = CeleryConfig """ CELERY_CONFIG = None diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index d08f2a8feb795..efe09a2d6973d 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -637,6 +637,21 @@ class HiveEngineSpec(PrestoEngineSpec): engine = 'hive' cursor_execute_kwargs = {'async': True} + # Scoping regex at class level to avoid recompiling + # 17/02/07 19:36:38 INFO ql.Driver: Total jobs = 5 + jobs_stats_r = re.compile( + r'.*INFO.*Total jobs = (?P[0-9]+)') + # 17/02/07 19:37:08 INFO ql.Driver: Launching Job 2 out of 5 + launching_job_r = re.compile( + '.*INFO.*Launching Job (?P[0-9]+) out of ' + '(?P[0-9]+)') + # 17/02/07 19:36:58 INFO exec.Task: 2017-02-07 19:36:58,152 Stage-18 + # map = 0%, reduce = 0% + stage_progress_r = re.compile( + r'.*INFO.*Stage-(?P[0-9]+).*' + r'map = (?P[0-9]+)%.*' + r'reduce = (?P[0-9]+)%.*') + @classmethod def patch(cls): from pyhive import hive @@ -666,38 +681,27 @@ def adjust_database_uri(cls, uri, selected_schema=None): return uri @classmethod - def progress(cls, logs): - # 17/02/07 19:36:38 INFO ql.Driver: Total jobs = 5 - jobs_stats_r = re.compile( - r'.*INFO.*Total jobs = (?P[0-9]+)') - # 17/02/07 19:37:08 INFO ql.Driver: Launching Job 2 out of 5 - launching_job_r = re.compile( - '.*INFO.*Launching Job (?P[0-9]+) out of ' - '(?P[0-9]+)') - # 17/02/07 19:36:58 INFO exec.Task: 2017-02-07 19:36:58,152 Stage-18 - # map = 0%, reduce = 0% - stage_progress = re.compile( - r'.*INFO.*Stage-(?P[0-9]+).*' - r'map = (?P[0-9]+)%.*' - r'reduce = (?P[0-9]+)%.*') - total_jobs = None + def progress(cls, log_lines): + total_jobs = 1 # assuming there's at least 1 job current_job = None stages = {} - lines = logs.splitlines() - for line in lines: - match = jobs_stats_r.match(line) + for line in log_lines: + match = cls.jobs_stats_r.match(line) if match: - total_jobs = int(match.groupdict()['max_jobs']) - match = launching_job_r.match(line) + total_jobs = int(match.groupdict()['max_jobs']) or 1 + match = cls.launching_job_r.match(line) if match: current_job = int(match.groupdict()['job_number']) stages = {} - match = stage_progress.match(line) + match = cls.stage_progress_r.match(line) if match: stage_number = int(match.groupdict()['stage_number']) map_progress = int(match.groupdict()['map_progress']) reduce_progress = int(match.groupdict()['reduce_progress']) stages[stage_number] = (map_progress + reduce_progress) / 2 + logging.info( + "Progress detail: {}, " + "total jobs: {}".format(stages, total_jobs)) if not total_jobs or not current_job: return 0 @@ -709,6 +713,13 @@ def progress(cls, logs): ) return int(progress) + @classmethod + def get_tracking_url(cls, log_lines): + lkp = "Tracking URL = " + for line in log_lines: + if lkp in line: + return line.split(lkp)[1] + @classmethod def handle_cursor(cls, cursor, query, session): """Updates progress information""" @@ -718,18 +729,35 @@ def handle_cursor(cls, cursor, query, session): hive.ttypes.TOperationState.RUNNING_STATE, ) polled = cursor.poll() + last_log_line = 0 + tracking_url = None while polled.operationState in unfinished_states: query = session.query(type(query)).filter_by(id=query.id).one() if query.status == QueryStatus.STOPPED: cursor.cancel() break - logs = cursor.fetch_logs() - if logs: - progress = cls.progress(logs) + resp = cursor.fetch_logs() + if resp and resp.log: + log = resp.log or '' + log_lines = resp.log.splitlines() + logging.info("\n".join(log_lines[last_log_line:])) + last_log_line = len(log_lines) - 1 + progress = cls.progress(log_lines) + logging.info("Progress total: {}".format(progress)) + needs_commit = False if progress > query.progress: query.progress = progress - session.commit() + needs_commit = True + if not tracking_url: + tracking_url = cls.get_tracking_url(log_lines) + if tracking_url: + logging.info( + "Found the tracking url: {}".format(tracking_url)) + query.tracking_url = tracking_url + needs_commit = True + if needs_commit: + session.commit() time.sleep(5) polled = cursor.poll() diff --git a/superset/migrations/versions/ca69c70ec99b_tracking_url.py b/superset/migrations/versions/ca69c70ec99b_tracking_url.py new file mode 100644 index 0000000000000..8a2ef38295c67 --- /dev/null +++ b/superset/migrations/versions/ca69c70ec99b_tracking_url.py @@ -0,0 +1,23 @@ +"""tracking_url + +Revision ID: ca69c70ec99b +Revises: a65458420354 +Create Date: 2017-07-26 20:09:52.606416 + +""" + +# revision identifiers, used by Alembic. +revision = 'ca69c70ec99b' +down_revision = 'a65458420354' + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + + +def upgrade(): + op.add_column('query', sa.Column('tracking_url', sa.Text(), nullable=True)) + + +def downgrade(): + op.drop_column('query', 'tracking_url') diff --git a/superset/models/sql_lab.py b/superset/models/sql_lab.py index 00eb388150041..e2e125ad2438a 100644 --- a/superset/models/sql_lab.py +++ b/superset/models/sql_lab.py @@ -69,6 +69,7 @@ class Query(Model): start_running_time = Column(Numeric(precision=20, scale=6)) end_time = Column(Numeric(precision=20, scale=6)) end_result_backend_time = Column(Numeric(precision=20, scale=6)) + tracking_url = Column(Text) changed_on = Column( DateTime, @@ -119,6 +120,7 @@ def to_dict(self): 'user': self.user.username, 'limit_reached': self.limit_reached, 'resultsKey': self.results_key, + 'trackingUrl': self.tracking_url, } @property diff --git a/superset/sql_lab.py b/superset/sql_lab.py index 638b29abbee19..55130cdaed618 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -192,6 +192,9 @@ def handle_error(msg): conn.close() return handle_error(db_engine_spec.extract_error_message(e)) + logging.info("Fetching cursor description") + cursor_description = cursor.description + conn.commit() conn.close() @@ -203,7 +206,7 @@ def handle_error(msg): }, default=utils.json_iso_dttm_ser) column_names = ( - [col[0] for col in cursor.description] if cursor.description else []) + [col[0] for col in cursor_description] if cursor_description else []) column_names = dedup(column_names) cdf = dataframe.SupersetDataFrame(pd.DataFrame( list(data), columns=column_names)) diff --git a/superset/views/core.py b/superset/views/core.py index eded30904f4bf..d5a31260c0607 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -427,7 +427,7 @@ class SliceAddView(SliceModelView): # noqa class DashboardModelView(SupersetModelView, DeleteMixin): # noqa datamodel = SQLAInterface(models.Dashboard) - + list_title = _('List Dashboards') show_title = _('Show Dashboard') add_title = _('Add Dashboard') @@ -2030,6 +2030,7 @@ def sql_json(self): # Async request. if async: + logging.info("Running query on a Celery worker") # Ignore the celery future object and the request may time out. try: sql_lab.get_sql_results.delay( diff --git a/tests/db_engine_specs_test.py b/tests/db_engine_specs_test.py index 626a97bb3f9c3..a3038132c0a8c 100644 --- a/tests/db_engine_specs_test.py +++ b/tests/db_engine_specs_test.py @@ -5,7 +5,7 @@ import unittest -from superset import db_engine_specs +from superset.db_engine_specs import HiveEngineSpec class DbEngineSpecsTestCase(unittest.TestCase): @@ -13,36 +13,38 @@ def test_0_progress(self): log = """ 17/02/07 18:26:27 INFO log.PerfLogger: 17/02/07 18:26:27 INFO log.PerfLogger: - """ - self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log)) + """.split('\n') + self.assertEquals( + 0, HiveEngineSpec.progress(log)) def test_0_progress(self): log = """ 17/02/07 18:26:27 INFO log.PerfLogger: 17/02/07 18:26:27 INFO log.PerfLogger: - """ - self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log)) + """.split('\n') + self.assertEquals( + 0, HiveEngineSpec.progress(log)) def test_number_of_jobs_progress(self): log = """ 17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2 - """ - self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log)) + """.split('\n') + self.assertEquals(0, HiveEngineSpec.progress(log)) def test_job_1_launched_progress(self): log = """ 17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2 17/02/07 19:15:55 INFO ql.Driver: Launching Job 1 out of 2 - """ - self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log)) + """.split('\n') + self.assertEquals(0, HiveEngineSpec.progress(log)) def test_job_1_launched_stage_1_0_progress(self): log = """ 17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2 17/02/07 19:15:55 INFO ql.Driver: Launching Job 1 out of 2 17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0% - """ - self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log)) + """.split('\n') + self.assertEquals(0, HiveEngineSpec.progress(log)) def test_job_1_launched_stage_1_map_40_progress(self): log = """ @@ -50,8 +52,8 @@ def test_job_1_launched_stage_1_map_40_progress(self): 17/02/07 19:15:55 INFO ql.Driver: Launching Job 1 out of 2 17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0% 17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 40%, reduce = 0% - """ - self.assertEquals(10, db_engine_specs.HiveEngineSpec.progress(log)) + """.split('\n') + self.assertEquals(10, HiveEngineSpec.progress(log)) def test_job_1_launched_stage_1_map_80_reduce_40_progress(self): log = """ @@ -60,8 +62,8 @@ def test_job_1_launched_stage_1_map_80_reduce_40_progress(self): 17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0% 17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 40%, reduce = 0% 17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 80%, reduce = 40% - """ - self.assertEquals(30, db_engine_specs.HiveEngineSpec.progress(log)) + """.split('\n') + self.assertEquals(30, HiveEngineSpec.progress(log)) def test_job_1_launched_stage_2_stages_progress(self): log = """ @@ -72,8 +74,8 @@ def test_job_1_launched_stage_2_stages_progress(self): 17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 80%, reduce = 40% 17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-2 map = 0%, reduce = 0% 17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 100%, reduce = 0% - """ - self.assertEquals(12, db_engine_specs.HiveEngineSpec.progress(log)) + """.split('\n') + self.assertEquals(12, HiveEngineSpec.progress(log)) def test_job_2_launched_stage_2_stages_progress(self): log = """ @@ -83,5 +85,5 @@ def test_job_2_launched_stage_2_stages_progress(self): 17/02/07 19:15:55 INFO ql.Driver: Launching Job 2 out of 2 17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0% 17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 40%, reduce = 0% - """ - self.assertEquals(60, db_engine_specs.HiveEngineSpec.progress(log)) + """.split('\n') + self.assertEquals(60, HiveEngineSpec.progress(log)) diff --git a/tests/sqllab_tests.py b/tests/sqllab_tests.py index 9e59adc7dd952..29d74f4dc3502 100644 --- a/tests/sqllab_tests.py +++ b/tests/sqllab_tests.py @@ -189,12 +189,9 @@ def test_search_query_on_time(self): from_time = 'from={}'.format(int(first_query_time)) to_time = 'to={}'.format(int(second_query_time)) params = [from_time, to_time] - resp = self.get_resp('/superset/search_queries?'+'&'.join(params)) + resp = self.get_resp('/superset/search_queries?' + '&'.join(params)) data = json.loads(resp) self.assertEquals(2, len(data)) - for k in data: - self.assertLess(int(first_query_time), k['startDttm']) - self.assertLess(k['startDttm'], int(second_query_time)) def test_alias_duplicate(self): self.run_sql(