Skip to content

Commit

Permalink
Feature/Fix: Get a full times_series for your filter instead of Topn …
Browse files Browse the repository at this point in the history
…for each point in time (#3434)
  • Loading branch information
fabianmenges authored and mistercrunch committed Sep 18, 2017
1 parent 6fe93e1 commit c3c9ceb
Showing 1 changed file with 59 additions and 27 deletions.
86 changes: 59 additions & 27 deletions superset/connectors/druid/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# pylint: disable=invalid-unary-operand-type
# pylint: disable=invalid-unary-operand-type
from collections import OrderedDict
import json
import logging
Expand Down Expand Up @@ -798,6 +798,28 @@ def values_for_column(self,
def get_query_str(self, query_obj, phase=1, client=None):
return self.run_query(client=client, phase=phase, **query_obj)

def _add_filter_from_pre_query_data(self, df, dimensions, dim_filter):
ret = dim_filter
if df is not None and not df.empty:
new_filters = []
for unused, row in df.iterrows():
fields = []
for dim in dimensions:
f = Dimension(dim) == row[dim]
fields.append(f)
if len(fields) > 1:
term = Filter(type="and", fields=fields)
new_filters.append(term)
elif fields:
new_filters.append(fields[0])
if new_filters:
ff = Filter(type="or", fields=new_filters)
if not dim_filter:
ret = ff
else:
ret = Filter(type="and", fields=[ff, dim_filter])
return ret

def run_query( # noqa / druid
self,
groupby, metrics,
Expand Down Expand Up @@ -834,7 +856,9 @@ def run_query( # noqa / druid

columns_dict = {c.column_name: c for c in self.columns}

all_metrics, post_aggs = self._metrics_and_post_aggs(metrics, metrics_dict)
all_metrics, post_aggs = self._metrics_and_post_aggs(
metrics,
metrics_dict)

aggregations = OrderedDict()
for m in self.metrics:
Expand Down Expand Up @@ -884,15 +908,41 @@ def run_query( # noqa / druid
if having_filters:
qry['having'] = having_filters
order_direction = "descending" if order_desc else "ascending"
orig_filters = filters
if len(groupby) == 0 and not having_filters:
del qry['dimensions']
client.timeseries(**qry)
if not having_filters and len(groupby) == 1 and order_desc:
dim = list(qry.get('dimensions'))[0]
if timeseries_limit_metric:
order_by = timeseries_limit_metric
else:
order_by = list(qry['aggregations'].keys())[0]
# Limit on the number of timeseries, doing a two-phases query
pre_qry = deepcopy(qry)
pre_qry['granularity'] = "all"
pre_qry['threshold'] = min(row_limit,
timeseries_limit or row_limit)
pre_qry['metric'] = order_by
pre_qry['dimension'] = dim
del pre_qry['dimensions']
client.topn(**pre_qry)
query_str += "// Two phase query\n// Phase 1\n"
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
query_str += "\n"
if phase == 1:
return query_str
query_str += (
"//\nPhase 2 (built based on phase one's results)\n")
df = client.export_pandas()
qry['filter'] = self._add_filter_from_pre_query_data(
df,
qry['dimensions'], filters)
qry['threshold'] = timeseries_limit or 1000
if row_limit and granularity == 'all':
qry['threshold'] = row_limit
qry['dimension'] = list(qry.get('dimensions'))[0]
qry['dimension'] = dim
del qry['dimensions']
qry['metric'] = list(qry['aggregations'].keys())[0]
client.topn(**qry)
Expand All @@ -908,7 +958,7 @@ def run_query( # noqa / druid
pre_qry['granularity'] = "all"
pre_qry['limit_spec'] = {
"type": "default",
"limit": timeseries_limit,
"limit": min(timeseries_limit, row_limit),
'intervals': (
inner_from_dttm.isoformat() + '/' +
inner_to_dttm.isoformat()),
Expand All @@ -927,29 +977,10 @@ def run_query( # noqa / druid
query_str += (
"//\nPhase 2 (built based on phase one's results)\n")
df = client.export_pandas()
if df is not None and not df.empty:
dims = qry['dimensions']
filters = []
for unused, row in df.iterrows():
fields = []
for dim in dims:
f = Dimension(dim) == row[dim]
fields.append(f)
if len(fields) > 1:
filt = Filter(type="and", fields=fields)
filters.append(filt)
elif fields:
filters.append(fields[0])

if filters:
ff = Filter(type="or", fields=filters)
if not orig_filters:
qry['filter'] = ff
else:
qry['filter'] = Filter(type="and", fields=[
ff,
orig_filters])
qry['limit_spec'] = None
qry['filter'] = self._add_filter_from_pre_query_data(
df,
qry['dimensions'], filters)
qry['limit_spec'] = None
if row_limit:
qry['limit_spec'] = {
"type": "default",
Expand Down Expand Up @@ -1111,5 +1142,6 @@ def query_datasources_by_name(
.all()
)


sa.event.listen(DruidDatasource, 'after_insert', set_perm)
sa.event.listen(DruidDatasource, 'after_update', set_perm)

0 comments on commit c3c9ceb

Please sign in to comment.