diff --git a/setup.py b/setup.py index 05b6620cde9f3..b4f58e2c625dc 100644 --- a/setup.py +++ b/setup.py @@ -33,7 +33,7 @@ 'markdown==2.6.6', 'pandas==0.18.1', 'parsedatetime==2.0.0', - 'pydruid==0.3.0', + 'pydruid==0.3.1', 'PyHive>=0.2.1', 'python-dateutil==2.5.3', 'requests==2.10.0', diff --git a/superset/cli.py b/superset/cli.py index bd99ee66efc37..86be0a30393f9 100755 --- a/superset/cli.py +++ b/superset/cli.py @@ -82,13 +82,19 @@ def load_examples(load_test_data): help=( "Specify which datasource name to load, if omitted, all " "datasources will be refreshed")) -def refresh_druid(datasource): +@manager.option( + '-m', '--merge', + help=( + "Specify using 'merge' property during operation. " + "Default value is False ")) +def refresh_druid(datasource, merge): """Refresh druid datasources""" session = db.session() from superset import models for cluster in session.query(models.DruidCluster).all(): try: - cluster.refresh_datasources(datasource_name=datasource) + cluster.refresh_datasources(datasource_name=datasource, + merge_flag=merge) except Exception as e: print( "Error while processing cluster '{}'\n{}".format( diff --git a/superset/models.py b/superset/models.py index af7a08a11eced..4653725b76fc7 100644 --- a/superset/models.py +++ b/superset/models.py @@ -1602,7 +1602,7 @@ def get_druid_version(self): ).format(obj=self) return json.loads(requests.get(endpoint).text)['version'] - def refresh_datasources(self, datasource_name=None): + def refresh_datasources(self, datasource_name=None, merge_flag=False): """Refresh metadata of all datasources in the cluster If ``datasource_name`` is specified, only that datasource is updated @@ -1611,7 +1611,7 @@ def refresh_datasources(self, datasource_name=None): for datasource in self.get_datasources(): if datasource not in config.get('DRUID_DATA_SOURCE_BLACKLIST'): if not datasource_name or datasource_name == datasource: - DruidDatasource.sync_to_db(datasource, self) + DruidDatasource.sync_to_db(datasource, self, merge_flag) @property def perm(self): @@ -1777,7 +1777,8 @@ def latest_metadata(self): try: segment_metadata = client.segment_metadata( datasource=self.datasource_name, - intervals=lbound + '/' + rbound) + intervals=lbound + '/' + rbound, + merge=self.merge_flag) except Exception as e: logging.warning("Failed first attempt to get latest segment") logging.exception(e) @@ -1790,7 +1791,8 @@ def latest_metadata(self): try: segment_metadata = client.segment_metadata( datasource=self.datasource_name, - intervals=lbound + '/' + rbound) + intervals=lbound + '/' + rbound, + merge=self.merge_flag) except Exception as e: logging.warning("Failed 2nd attempt to get latest segment") logging.exception(e) @@ -1876,7 +1878,7 @@ def sync_to_db_from_config(cls, druid_config, user, cluster): session.commit() @classmethod - def sync_to_db(cls, name, cluster): + def sync_to_db(cls, name, cluster, merge): """Fetches metadata for that datasource and merges the Superset db""" logging.info("Syncing Druid datasource [{}]".format(name)) session = get_session() @@ -1889,6 +1891,7 @@ def sync_to_db(cls, name, cluster): flasher("Refreshing datasource [{}]".format(name), "info") session.flush() datasource.cluster = cluster + datasource.merge_flag = merge session.flush() cols = datasource.latest_metadata() diff --git a/tests/druid_tests.py b/tests/druid_tests.py index 0da4b848f6c44..7a1b8da59caf1 100644 --- a/tests/druid_tests.py +++ b/tests/druid_tests.py @@ -100,6 +100,7 @@ def test_client(self, PyDruid): cluster.get_datasources = Mock(return_value=['test_datasource']) cluster.get_druid_version = Mock(return_value='0.9.1') cluster.refresh_datasources() + cluster.refresh_datasources(merge_flag=True) datasource_id = cluster.datasources[0].id db.session.commit()