Skip to content

Commit

Permalink
feat(ingest/lookml): CLL support for sql set in sql_table_name attrib…
Browse files Browse the repository at this point in the history
…ute of lookml view (#11069)
  • Loading branch information
sid-acryl authored Aug 7, 2024
1 parent 2755cf3 commit 8bea5d2
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,11 @@
ChartTypeClass,
ContainerClass,
DashboardInfoClass,
DataPlatformInfoClass,
InputFieldClass,
InputFieldsClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
PlatformTypeClass,
SubTypesClass,
)
from datahub.utilities.backpressure_aware_executor import BackpressureAwareExecutor
Expand Down Expand Up @@ -1573,25 +1571,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

looker_dashboards_for_usage: List[looker_usage.LookerDashboardForUsage] = []

# Emit platform instance entity
if self.source_config.platform_instance:
platform_instance_urn = builder.make_dataplatform_instance_urn(
platform=self.source_config.platform_name,
instance=self.source_config.platform_instance,
)

yield MetadataWorkUnit(
id=f"{platform_instance_urn}-aspect-dataplatformInfo",
mcp=MetadataChangeProposalWrapper(
entityUrn=platform_instance_urn,
aspect=DataPlatformInfoClass(
name=self.source_config.platform_instance,
type=PlatformTypeClass.OTHERS,
datasetNameDelimiter=".",
),
),
)

with self.reporter.report_stage("dashboard_chart_metadata"):
for job in BackpressureAwareExecutor.map(
self.process_dashboard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
create_template,
)
from datahub.ingestion.source.looker.lookml_config import DERIVED_VIEW_PATTERN
from datahub.ingestion.source.looker.str_functions import (
remove_extra_spaces_and_newlines,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -95,6 +92,11 @@ def resolve_liquid_variable(text: str, liquid_variable: Dict[Any, Any]) -> str:
return text


def _drop_derived_view_pattern(value: str) -> str:
# Drop ${ and }
return re.sub(DERIVED_VIEW_PATTERN, r"\1", value)


def _complete_incomplete_sql(raw_view: dict, sql: str) -> str:

# Looker supports sql fragments that omit the SELECT and FROM parts of the query
Expand All @@ -109,8 +111,7 @@ def _complete_incomplete_sql(raw_view: dict, sql: str) -> str:
# add a FROM clause at the end
sql_query = f"{sql_query} FROM {raw_view['name']}"

# Drop ${ and }
return re.sub(DERIVED_VIEW_PATTERN, r"\1", sql_query)
return _drop_derived_view_pattern(sql_query)


def resolve_liquid_variable_in_view_dict(
Expand All @@ -122,10 +123,14 @@ def resolve_liquid_variable_in_view_dict(
for view in raw_view["views"]:
if "sql_table_name" in view:
view["datahub_transformed_sql_table_name"] = resolve_liquid_variable(
text=remove_extra_spaces_and_newlines(view["sql_table_name"]),
text=view["sql_table_name"],
liquid_variable=liquid_variable,
) # keeping original sql_table_name as is to avoid any visualization issue later

view["datahub_transformed_sql_table_name"] = _drop_derived_view_pattern(
value=view["datahub_transformed_sql_table_name"]
)

if "derived_table" in view and "sql" in view["derived_table"]:
# In sql we don't need to remove the extra spaces as sql parser takes care of extra spaces and \n
# while generating URN from sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
from datahub.ingestion.source.looker.looker_dataclasses import LookerViewFile
from datahub.ingestion.source.looker.looker_file_loader import LookerViewFileLoader
from datahub.ingestion.source.looker.lookml_config import (
DERIVED_VIEW_PATTERN,
DERIVED_VIEW_SUFFIX,
NAME,
LookMLSourceReport,
)
from datahub.ingestion.source.looker.lookml_refinement import LookerRefinementResolver
from datahub.ingestion.source.looker.str_functions import (
remove_extra_spaces_and_newlines,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,7 +58,7 @@ def column_name_in_sql_attribute(self) -> List[str]:

class LookerViewContext:
"""
There are six patterns to associate the view's fields with dataset
There are seven patterns to associate the view's fields with dataset
Pattern1:
view: view_name {
Expand Down Expand Up @@ -161,6 +163,36 @@ class LookerViewContext:
For all possible options of "sql" attribute please refer looker doc:
https://cloud.google.com/looker/docs/reference/param-field-sql
For pattern 6 i.e. view.derived.sql, The looker creates a temporary table to store the sql result,
However if we don't want to have a temporary table and want looker to always execute the sql to fetch the result then
in that case pattern 7 is useful (mentioned below).
Pattern7:
view: customer_sales {
sql_table_name: (
SELECT
customer_id,
SUM(sales_amount) AS total_sales
FROM
sales
GROUP BY
customer_id
) ;;
dimension: customer_id {
sql: ${TABLE}.customer_id ;;
}
measure: total_sales {
type: sum
sql: ${TABLE}.total_sales ;;
}
}
In Pattern7 the fields' upstream dataset is the output of sql mentioned in
customer_sales.sql_table_name.
"""

raw_view: Dict
Expand Down Expand Up @@ -252,6 +284,7 @@ def _get_sql_table_name_field(self) -> Optional[str]:
return self.get_including_extends(field="sql_table_name")

def _is_dot_sql_table_name_present(self) -> bool:

sql_table_name: Optional[str] = self._get_sql_table_name_field()

if sql_table_name is None:
Expand All @@ -268,7 +301,7 @@ def sql_table_name(self) -> str:
if sql_table_name is None:
sql_table_name = self.raw_view[NAME].lower()

return sql_table_name
return sql_table_name.lower()

def datahub_transformed_sql_table_name(self) -> str:
table_name: Optional[str] = self.raw_view.get(
Expand All @@ -278,13 +311,13 @@ def datahub_transformed_sql_table_name(self) -> str:
if not table_name:
table_name = self.sql_table_name()

# sql_table_name is in the format "${view-name}.SQL_TABLE_NAME"
# remove extra characters
if self._is_dot_sql_table_name_present():
table_name = re.sub(DERIVED_VIEW_PATTERN, r"\1", table_name)
# remove extra spaces and new lines from sql_table_name if it is not a sql
if not self.is_direct_sql_query_case():
table_name = remove_extra_spaces_and_newlines(table_name)
# Some sql_table_name fields contain quotes like: optimizely."group", just remove the quotes
table_name = table_name.replace('"', "").replace("`", "").lower()

# Some sql_table_name fields contain quotes like: optimizely."group", just remove the quotes
return table_name.replace('"', "").replace("`", "").lower()
return table_name

def derived_table(self) -> Dict[Any, Any]:
"""
Expand Down Expand Up @@ -371,6 +404,11 @@ def is_materialized_derived_view(self) -> bool:
def is_regular_case(self) -> bool:
# regular-case is pattern1 and 2 where upstream table is either view-name or
# table name mentioned in sql_table_name attribute

# It should not be the sql query
if self.is_direct_sql_query_case():
return False

if (
self.is_sql_table_name_referring_to_view()
or self.is_sql_based_derived_case()
Expand All @@ -381,6 +419,9 @@ def is_regular_case(self) -> bool:
return True

def is_sql_table_name_referring_to_view(self) -> bool:
if self.is_direct_sql_query_case():
return False

# It is pattern3
return self._is_dot_sql_table_name_present()

Expand Down Expand Up @@ -413,3 +454,14 @@ def is_sql_based_derived_view_without_fields_case(self) -> bool:
return True

return False

def is_direct_sql_query_case(self) -> bool:
# pattern 7
# sqlglot doesn't have a function to validate whether text is valid SQL or not.
# Applying a simple logic to check if sql_table_name contains a sql.
# if sql_table_name contains sql then its value starts with "(" and checking if "select" is present in side the
# text
return (
self.sql_table_name().strip().startswith("(")
and "select" in self.sql_table_name()
)
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def create_fields(self) -> List[ViewField]:
return [] # it is for the special case


class SqlBasedDerivedViewUpstream(AbstractViewUpstream):
class SqlBasedDerivedViewUpstream(AbstractViewUpstream, ABC):
"""
Handle the case where upstream dataset is defined in derived_table.sql
"""
Expand All @@ -263,7 +263,7 @@ def __get_spr(self) -> Optional[SqlParsingResult]:
return None

spr = create_lineage_sql_parsed_result(
query=self.view_context.datahub_transformed_sql(),
query=self.get_sql_query(),
default_schema=self.view_context.view_connection.default_schema,
default_db=self.view_context.view_connection.default_db,
platform=self.view_context.view_connection.platform,
Expand Down Expand Up @@ -390,6 +390,28 @@ def get_upstream_column_ref(
def get_upstream_dataset_urn(self) -> List[Urn]:
return self._get_upstream_dataset_urn()

@abstractmethod
def get_sql_query(self) -> str:
pass


class DirectQueryUpstreamSource(SqlBasedDerivedViewUpstream):
"""
Pattern 7 as per view-context documentation
"""

def get_sql_query(self) -> str:
return self.view_context.datahub_transformed_sql_table_name()


class DerivedQueryUpstreamSource(SqlBasedDerivedViewUpstream):
"""
Pattern 4 as per view-context documentation
"""

def get_sql_query(self) -> str:
return self.view_context.datahub_transformed_sql()


class NativeDerivedViewUpstream(AbstractViewUpstream):
"""
Expand Down Expand Up @@ -611,6 +633,7 @@ def create_view_upstream(
ctx: PipelineContext,
reporter: LookMLSourceReport,
) -> AbstractViewUpstream:

if view_context.is_regular_case():
return RegularViewUpstream(
view_context=view_context,
Expand All @@ -629,11 +652,23 @@ def create_view_upstream(
looker_view_id_cache=looker_view_id_cache,
)

if (
view_context.is_sql_based_derived_case()
or view_context.is_sql_based_derived_view_without_fields_case()
if any(
[
view_context.is_sql_based_derived_case(),
view_context.is_sql_based_derived_view_without_fields_case(),
]
):
return SqlBasedDerivedViewUpstream(

return DerivedQueryUpstreamSource(
view_context=view_context,
config=config,
reporter=reporter,
ctx=ctx,
looker_view_id_cache=looker_view_id_cache,
)

if view_context.is_direct_sql_query_case():
return DirectQueryUpstreamSource(
view_context=view_context,
config=config,
reporter=reporter,
Expand All @@ -651,9 +686,9 @@ def create_view_upstream(
)

reporter.report_warning(
title="Implementation Not Found",
title="ViewUpstream Implementation Not Found",
message="No implementation found to resolve upstream of the view",
context=view_context.view_file_name(),
context=f"view_name={view_context.name()} , view_file_name={view_context.view_file_name()}",
)

return EmptyImplementation(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,4 @@
[
{
"entityType": "dataPlatformInstance",
"entityUrn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInfo",
"aspect": {
"json": {
"name": "ap-south-1",
"type": "OTHERS",
"datasetNameDelimiter": "."
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:e7fe6fc9c3ca70e78694dcc5dd9c05b7",
Expand Down Expand Up @@ -805,22 +787,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataPlatformInstance",
"entityUrn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:looker,ap-south-1)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "looker-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:Dimension",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ include: "employee_total_income.view.lkml"
include: "top_10_employee_income_source.view.lkml"
include: "employee_tax_report.view.lkml"
include: "employee_salary_rating.view.lkml"
include: "rent_as_employee_income_source.view.lkml"

explore: activity_logs {
}
Expand All @@ -23,4 +24,7 @@ explore: employee_tax_report {
}

explore: employee_salary_rating {
}

explore: rent_as_employee_income_source {
}
Loading

0 comments on commit 8bea5d2

Please sign in to comment.