Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sensor for Databricks partition and table changes #28950

Closed
wants to merge 59 commits into from

Conversation

harishkrao
Copy link
Contributor

@harishkrao harishkrao commented Jan 15, 2023

Closes: #21381


Sensors for Databricks SQL to detect table partitions and new table events.

@harishkrao
Copy link
Contributor Author

@alexott it would be great if you can review the PR and suggest feedback. Thank you for your time.

Copy link
Contributor

@alexott alexott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution!

I would make it the DatabricksSqlSensor really generic by allowing to pass an arbitrary SQL expression that will trigger sensor if it has non-empty result. And then on top of it we can build Partition & History change sensors.

Also we need:

  • Documentation
  • Sensor should be declared in the airflow/providers/databricks/provider.yaml

airflow/providers/databricks/sensors/databricks.py Outdated Show resolved Hide resolved
airflow/providers/databricks/sensors/databricks.py Outdated Show resolved Hide resolved
airflow/providers/databricks/sensors/databricks.py Outdated Show resolved Hide resolved
airflow/providers/databricks/sensors/databricks.py Outdated Show resolved Hide resolved
airflow/providers/databricks/sensors/databricks.py Outdated Show resolved Hide resolved
airflow/providers/databricks/sensors/databricks.py Outdated Show resolved Hide resolved
airflow/providers/databricks/sensors/databricks.py Outdated Show resolved Hide resolved
airflow/providers/databricks/sensors/databricks.py Outdated Show resolved Hide resolved
airflow/providers/databricks/sensors/databricks.py Outdated Show resolved Hide resolved
"""
Generic Databricks SQL sensor.

:param databricks_conn_id:str=DatabricksSqlHook.default_conn_name: Specify the name of the connection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need include type to the docstrings, just use :param {argname}: {description}, all information from about expected types get from annotations.

Include this may cause issues when documentation will generated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the type. Thanks for the feedback.

Comment on lines 125 to 131

Args:
context (Context): Airflow context
lookup_key (_type_): Unique lookup key used to store values related to a specific table.

Returns:
int: Version number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use Google style in docstring, please use reStructuredText

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it.

airflow/providers/databricks/sensors/databricks.py Outdated Show resolved Hide resolved
partition_name: dict = {"date": "2023-1-1"},
handler: Callable[[Any], Any] = fetch_all_handler,
db_sensor_type: str,
timestamp: datetime = datetime.now() - timedelta(days=7),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do not need any default value for timestamp.
If it mandatory field, just make it mandatory and user should provide actual value here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I thought it needs to have some default value is to extract history for a time period irrespective of a custom value provided by the user, for example - past 7 days.

airflow/providers/databricks/sensors/databricks.py Outdated Show resolved Hide resolved
@alexott
Copy link
Contributor

alexott commented Jan 15, 2023

Also, it makes sense to declare some of properties as templatised. For example, partitions mapping, etc.

@alexott
Copy link
Contributor

alexott commented Jan 15, 2023

Also, for partition sensor it would make sense to allow to specify operations on the partitions, like, allow not only = as comparison, but also in (if value is a list), >/</!=/...

@harishkrao
Copy link
Contributor Author

Also, it makes sense to declare some of properties as templatised. For example, partitions mapping, etc.

@alexott agree, that would be good to implement. Do you have an example to follow for this one?

@alexott
Copy link
Contributor

alexott commented Jan 16, 2023

Yes, just look into DatabricksSqlOperator - it allows to templatize sql field. Just take into account that template expansion happens after __init__ is called

@harishkrao
Copy link
Contributor Author

@alexott @Taragolis Thank you for taking the time to review the PR and for giving valuable feedback. I have addressed all of the comments. It would be great if you can review the changes. Thank you again!

@harishkrao harishkrao force-pushed the databricks-sql-sensor branch 4 times, most recently from 6a9e11b to 7781bd7 Compare January 29, 2023 03:03
Comment on lines 96 to 100
- integration-name: Databricks SQL
python-modules:
- airflow.providers.databricks.sensors.databricks_sql
- integration-name: Databricks Partition
python-modules:
- airflow.providers.databricks.sensors.databricks_partition
- integration-name: Databricks Table Changes
python-modules:
- airflow.providers.databricks.sensors.databricks_table_changes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can combine all of them under the Databricks SQL umbrella

table_name: str = "",
partition_name: dict,
handler: Callable[[Any], Any] = fetch_one_handler,
caller: str = "DatabricksPartitionSensor",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let hardcode caller instead of passing it as an argument.

Comment on lines 70 to 77
databricks_conn_id: str = DatabricksSqlHook.default_conn_name,
http_path: str | None = None,
sql_endpoint_name: str | None = None,
session_configuration=None,
http_headers: list[tuple[str, str]] | None = None,
catalog: str = "",
schema: str = "default",
table_name: str = "",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we inherit most of the parameters from DatabricksSqlSensor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, changed it.

self.handler = handler
self.partition_operator = partition_operator

def _get_hook(self) -> DatabricksSqlHook:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we pass all parameters to the DatabricksSqlSensor, then we can simply inherit that hook from it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, implemented the change.

return sql_result

def _check_table_partitions(self) -> list:
if self.catalog is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be always executed this way because we're passing empty string as default. Also, we can continue to rely on two-level and even one level naming, relying on default catalog & default schema...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it.

Comment on lines 118 to 121
if len(result) >= 1:
return True
else:
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really just return len(result) > 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it.

Comment on lines 66 to 77
databricks_conn_id: str = DatabricksSqlHook.default_conn_name,
http_path: str | None = None,
sql_endpoint_name: str | None = None,
session_configuration=None,
http_headers: list[tuple[str, str]] | None = None,
catalog: str = "",
schema: str = "default",
table_name: str = "",
handler: Callable[[Any], Any] = fetch_all_handler,
timestamp: datetime = datetime.now() - timedelta(days=7),
caller: str = "DatabricksTableChangesSensor",
client_parameters: dict[str, Any] | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments as for partition sensor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it.


def get_current_table_version(self, table_name, time_range):
change_sql = (
f"SELECT COUNT(version) as versions from "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about selecting max(version) instead of count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, will change it.

change_sql = (
f"SELECT COUNT(version) as versions from "
f"(DESCRIBE HISTORY {table_name}) "
f"WHERE timestamp >= '{time_range}'"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if time_range doesn't change between calls, we may return true all the time. Why not compare with the latest version instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to report only data changes? Otherwise we'll get not necessary changes like after VACUUM, OPTIMIZE, ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am working on this, will push the changes soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added these changes in the recent push.

Comment on lines 135 to 139
if self.catalog is not None:
complete_table_name = str(self.catalog + "." + self.schema + "." + self.table_name)
self.log.debug("Table name generated from arguments: %s", complete_table_name)
else:
raise AirflowException("Catalog name not specified, aborting query execution.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as for partition sensor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it.

@@ -17,6 +17,9 @@
# under the License.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexott FYI: The Escaper class was not available on the main branch. So, I added it as part of this PR to be used in our provider.
Also, made some minor changes to exception handling in the Escaper class because it was looking for some user defined Exception classes from pyhive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do I use the Escaper class in databricks-sql-python within Airflow? Do I add it to setup.py to be installed when Airflow starts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the help, I imported it via from databricks.sql.utils import ParamEscaper

Copy link
Contributor

@alexott alexott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some minor changes required, but otherwise - looks good

Comment on lines 47 to 48
:param _catalog: An optional initial catalog to use. Requires DBR version 9.0+ (templated)
:param _schema: An optional initial schema to use. Requires DBR version 9.0+ (templated)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why starting with _? It then doesn't match other operators

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it and made it consistent with other operators.

output_list.append(
f"""{partition_col}{self.partition_operator}{self.escaper.escape_item(partition_value)}"""
)
if isinstance(partition_value, (str, datetime.date)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about timestamps aka datetime.datetime ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it.

)
return self._sql_sensor(partition_sql)

def _get_results(self, context: Context) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need context here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need it, removed it.

Databricks connection's extra parameters.
:param http_headers: An optional list of (k, v) pairs that will be set as HTTP headers on every request.
:param client_parameters: Additional parameters internal to Databricks SQL Connector parameters.
:param sql: SQL query to be executed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all parameters for __init__ are documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

airflow/providers/databricks/sensors/databricks_sql.py Outdated Show resolved Hide resolved
Comment on lines 162 to 163
if len(result) < 1:
raise AirflowException("Databricks SQL partition sensor failed.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this handles lack of results differently than the generic sensor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unified the handling across classes.

Comment on lines 48 to 50
:param _catalog: An optional initial catalog to use.
Requires DBR version 9.0+ (templated), defaults to ""
:param _schema: An optional initial schema to use.
Requires DBR version 9.0+ (templated), defaults to "default"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here - unify names with base operator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

def get_current_table_version(self, table_name, time_range, operator):
_count_describe_literal = "SELECT MAX(version) AS versions FROM (DESCRIBE HISTORY"
_filter_predicate_literal = ") WHERE timestamp"
_operation_filter_literal = "AND operation NOT LIKE '%CONVERT%' AND operation NOT LIKE '%OPTIMIZE%' \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be simpler to do operation NOT IN ('CONVERT', 'OPTIMIZE', ...) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@harishkrao harishkrao Feb 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added FSCK. Changed it to a NOT IN filter. Additionally, interesting to note that (on running a few commands like) FSCK, OPTIMIZE are not recorded in the history of the Delta table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding VACUUM START, ... let's try to use just single % -> VACUUM% to avoid for searching by substring

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, changed it.

def __init__(
self,
table_name: str,
timestamp: datetime = datetime.now() - timedelta(days=7),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about making this optional? for example, I want to check changes without taking timestamp into account

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

@@ -17,6 +17,9 @@
# under the License.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harishkrao
Copy link
Contributor Author

There are some minor changes required, but otherwise - looks good

@alexott thank you for taking the time to review! I resolved all the comments (except the escaper class).

@harishkrao harishkrao force-pushed the databricks-sql-sensor branch 5 times, most recently from d18d4a7 to bf87dba Compare February 16, 2023 19:58
@eladkal
Copy link
Contributor

eladkal commented Feb 17, 2023

@harishkrao does this PR solves #21381 ?

@alexott
Copy link
Contributor

alexott commented Feb 19, 2023

@eladkal yes, it will solve #21381

Copy link
Contributor

@alexott alexott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment about returning false vs. throwing an exception when there is no results.

But primary request for changes is for adding missing pieces:

  • We need documentation be added as well
  • Documentation should include examples - add a sensor example to tests/system/providers/databricks - it will be used for integration tests

airflow/providers/databricks/sensors/databricks_sql.py Outdated Show resolved Hide resolved
@harishkrao
Copy link
Contributor Author

See my comment about returning false vs. throwing an exception when there is no results.

But primary request for changes is for adding missing pieces:

* We need documentation be added as well

* Documentation should include examples - add a sensor example to `tests/system/providers/databricks` - it will be used for integration tests

@alexott just pushed an example DAG file, similar to the ones for Operators.

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently I missed stuff because I reviewed from my phone
I think the code requires some more work. I'm not familiar with Databricks but the sensors seems very complex and I wonder for all of them if logic shouldn't be in hook?

@o-nikolas @josh-fell i appreciate another eye here.

Comment on lines +96 to +97
def get_previous_version(context: Context, lookup_key):
return context["ti"].xcom_pull(key=lookup_key, include_prior_dates=True)
Copy link
Contributor

@eladkal eladkal Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the xcom part.
Why the sensor push and pull from xcom on every poke?

Copy link
Contributor Author

@harishkrao harishkrao Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We store metadata on the most recently queried version for that table. And we send/receive them using xcom. On querying the current version from Databricks, we compare it with the one stored in the metadata and take an action accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe there is a guarantee that the most recent XCom will be pulled here. Behind the scenes XCom.get_many() is called and just retrieves the first record. At the mercy of the metadatabase being used there.

Also, what happens in a mapped operator situation? If the XCom key is always the same, it seems possible this can pull an XCom key for an entirely different task since task_ids and map_index is not specified in the xcom_pull() call.

Another question then would be what if the input args are the same (i.e. checking for changes in the same table) but a user simply updates the task_id. Would this sensor yield a false positive that there was indeed a change?

I don't necessarily have answers to these questions on the top of my head, but some things to think about with using XComs in this way.

Comment on lines +124 to +152
def _get_results_table_changes(self, context) -> bool:
complete_table_name = str(self.catalog + "." + self.schema + "." + self.table_name)
self.log.debug("Table name generated from arguments: %s", complete_table_name)

prev_version = -1
if context is not None:
lookup_key = complete_table_name
prev_data = self.get_previous_version(lookup_key=lookup_key, context=context)
self.log.debug("prev_data: %s, type=%s", str(prev_data), type(prev_data))
if isinstance(prev_data, int):
prev_version = prev_data
elif prev_data is not None:
raise AirflowException("Incorrect type for previous XCom data: %s", type(prev_data))
version = self.get_current_table_version(table_name=complete_table_name)
self.log.debug("Current version: %s", version)
if version is None:
return False
if prev_version < version:
result = True
else:
return False
if prev_version != version:
self.set_version(lookup_key=lookup_key, version=version, context=context)
self.log.debug("Result: %s", result)
return result
return False

def poke(self, context: Context) -> bool:
return self._get_results_table_changes(context=context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very complicated.
Sensor should ask simple question. Most of the logic for the operations should be functions in the hook (so they can also be utalized for other sensor/custom sensor users can create)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexott can you please weigh in on the design decisions we made to arrive at this pattern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Some of these functions look like they could be handy if made generally available as part of a hook.

output_list.append(
f"""{partition_col}{self.partition_operator}{self.escaper.escape_item(partition_value)}"""
)
# TODO: Check date types.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove this.

from airflow.utils.context import Context


class DatabricksSqlSensor(BaseSensorOperator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm missing something here.
If this sensor leverages DbApiHook why doesn't it subclass SqlSensor?

Copy link
Contributor Author

@harishkrao harishkrao Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change it to inherit the SqlSensor.

@harishkrao
Copy link
Contributor Author

Apparently I missed stuff because I reviewed from my phone I think the code requires some more work. I'm not familiar with Databricks but the sensors seems very complex and I wonder for all of them if logic shouldn't be in hook?

@o-nikolas @josh-fell i appreciate another eye here.

@alexott can you please elaborate on the reasoning for why we wrote the sensors with this design?

) as dag:
# [docs]
connection_id = "databricks_default"
sql_endpoint_name = "Starter Warehouse"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually add test code to setup the resource under test, or at least make it configurable (os env var etc) so that users can setup their own and supply the correct config to test against it.

from airflow.providers.databricks.sensors.sql import DatabricksSqlSensor
from airflow.providers.databricks.sensors.table_changes import DatabricksTableChangesSensor

# [docs]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of these tags?

Comment on lines +132 to +136
self.log.debug("prev_data: %s, type=%s", str(prev_data), type(prev_data))
if isinstance(prev_data, int):
prev_version = prev_data
elif prev_data is not None:
raise AirflowException("Incorrect type for previous XCom data: %s", type(prev_data))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO all this logic should be inside get_previous_version() rather than here.

Comment on lines +137 to +146
version = self.get_current_table_version(table_name=complete_table_name)
self.log.debug("Current version: %s", version)
if version is None:
return False
if prev_version < version:
result = True
else:
return False
if prev_version != version:
self.set_version(lookup_key=lookup_key, version=version, context=context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I fully understand this logic. The two False cases can certainly be collapsed to be more compact, but also, shouldn't the false case be setting result rather than returning? If they return then the code to store the version in xcom is not executed. You're basically always comparing version with the prev_version default value of -1 from what I can tell.

self.log.debug("Table name generated from arguments: %s", complete_table_name)

prev_version = -1
if context is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we really worried about context being missing?
If so, then just add a statement like:

if not context:
    return False

This way the whole main block of code doesn't have to be indented.
Also if context is really missing you may want to throw an exception instead of just returning False.

def set_version(context: Context, lookup_key, version):
context["ti"].xcom_push(key=lookup_key, value=version)

def get_current_table_version(self, table_name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can any or all of this be pushed into the hook? Validating things like operators doesn't seem like the right thing

if len(partition_columns) < 1:
raise AirflowException("Table %s does not have partitions", table_name)
formatted_opts = ""
if opts is not None and len(opts) > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if opts is not None and len(opts) > 0:
if opts:

from airflow.providers.common.sql.hooks.sql import fetch_all_handler
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.context import Context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this import is only used for typing it should be put behind typing.TYPE_CHECKING. One fewer import at runtime. Applicable to all of the other net-new modules in this PR too.

Comment on lines +139 to +141
if len(result) < 1:
return False
return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(result) < 1:
return False
return True
return bool(result)

Small optimization.

Comment on lines +78 to +99
"""Sensor to execute SQL statements on a Delta table via Databricks.

:param databricks_conn_id: Reference to :ref:`Databricks
connection id<howto/connection:databricks>` (templated), defaults to
DatabricksSqlHook.default_conn_name
:param http_path: Optional string specifying HTTP path of Databricks SQL Endpoint or cluster.
If not specified, it should be either specified in the Databricks connection's
extra parameters, or ``sql_endpoint_name`` must be specified.
:param sql_endpoint_name: Optional name of Databricks SQL Endpoint. If not specified, ``http_path``
must be provided as described above, defaults to None
:param session_configuration: An optional dictionary of Spark session parameters. If not specified,
it could be specified in the Databricks connection's extra parameters., defaults to None
:param http_headers: An optional list of (k, v) pairs
that will be set as HTTP headers on every request. (templated).
:param catalog: An optional initial catalog to use.
Requires DBR version 9.0+ (templated), defaults to ""
:param schema: An optional initial schema to use.
Requires DBR version 9.0+ (templated), defaults to "default"
:param sql: SQL statement to be executed.
:param handler: Handler for DbApiHook.run() to return results, defaults to fetch_all_handler
:param client_parameters: Additional parameters internal to Databricks SQL Connector parameters.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two sets of docstrings for this sensor's construction. Can you consolidate please?

Comment on lines +124 to +152
def _get_results_table_changes(self, context) -> bool:
complete_table_name = str(self.catalog + "." + self.schema + "." + self.table_name)
self.log.debug("Table name generated from arguments: %s", complete_table_name)

prev_version = -1
if context is not None:
lookup_key = complete_table_name
prev_data = self.get_previous_version(lookup_key=lookup_key, context=context)
self.log.debug("prev_data: %s, type=%s", str(prev_data), type(prev_data))
if isinstance(prev_data, int):
prev_version = prev_data
elif prev_data is not None:
raise AirflowException("Incorrect type for previous XCom data: %s", type(prev_data))
version = self.get_current_table_version(table_name=complete_table_name)
self.log.debug("Current version: %s", version)
if version is None:
return False
if prev_version < version:
result = True
else:
return False
if prev_version != version:
self.set_version(lookup_key=lookup_key, version=version, context=context)
self.log.debug("Result: %s", result)
return result
return False

def poke(self, context: Context) -> bool:
return self._get_results_table_changes(context=context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Some of these functions look like they could be handy if made generally available as part of a hook.

defaults to >=.
"""

template_fields: Sequence[str] = ("databricks_conn_id", "catalog", "schema", "table_name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it would be useful to have timestamp as a template field too. I could foresee users wanting to use one of the built-in Jinja variables for this to the task is idempotent (like {{ data_interval_start }} for example). Or, be used as a dynamic input from a previous task.

Comment on lines +80 to +87
def _sql_sensor(self, sql):
hook = self._get_hook()
sql_result = hook.run(
sql,
handler=self.handler if self.do_xcom_push else None,
)
return sql_result

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _sql_sensor(self, sql):
hook = self._get_hook()
sql_result = hook.run(
sql,
handler=self.handler if self.do_xcom_push else None,
)
return sql_result

Same idea here. This method exists in DatabricksSqlSensor.

Comment on lines +152 to +153
def poke(self, context: Context) -> bool:
return self._get_results()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically could remove this too.

Comment on lines +96 to +97
def get_previous_version(context: Context, lookup_key):
return context["ti"].xcom_pull(key=lookup_key, include_prior_dates=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe there is a guarantee that the most recent XCom will be pulled here. Behind the scenes XCom.get_many() is called and just retrieves the first record. At the mercy of the metadatabase being used there.

Also, what happens in a mapped operator situation? If the XCom key is always the same, it seems possible this can pull an XCom key for an entirely different task since task_ids and map_index is not specified in the xcom_pull() call.

Another question then would be what if the input args are the same (i.e. checking for changes in the same table) but a user simply updates the task_id. Would this sensor yield a false positive that there was indeed a change?

I don't necessarily have answers to these questions on the top of my head, but some things to think about with using XComs in this way.

)


class TestDatabricksPartitionSensor(unittest.TestCase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an ongoing effort to move away from unittest in favor of pytest. Since these tests are net-new, could you change this and the other tests in the PR to pytest please?

connection_id = "databricks_default"
sql_endpoint_name = "Starter Warehouse"

# [START howto_sensor_databricks_sql]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These START/END markers are used to include code snippets in guides (generally). It would be great if there was accompanying documentation for these new sensors and ones that take advantage of the snippets being outlined in this DAG. There are a lot of examples in operator guides on how this is done throughout the providers.

partition_columns = self._sql_sensor(f"DESCRIBE DETAIL {table_name}")[0][7]
self.log.info("table_info: %s", partition_columns)
if len(partition_columns) < 1:
raise AirflowException("Table %s does not have partitions", table_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise AirflowException("Table %s does not have partitions", table_name)
raise AirflowException(f"Table {table_name} does not have partitions")

Otherwise the message logged will not be what you expect.

# TODO: Check date types.
else:
raise AirflowException(
"Column %s not part of table partitions: %s", partition_col, partition_columns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Column %s not part of table partitions: %s", partition_col, partition_columns
f"Column {partition_col} not part of table partitions: {partition_columns}"

Same here.

@harishkrao
Copy link
Contributor Author

@eladkal @o-nikolas @josh-fell @alexott thanks for taking the time to review my code and provide feedback. I appreciate it. To incorporate the changes for the 3 sensors, I will break these down into 3 separate PRs so that it is easy to manage and test them individually rather than a bulk of changes in a single PR.
I will work on the changes and open the new PRs.
I will close the currently open PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Databricks: sensor for Delta events
6 participants