-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sensor for Databricks partition and table changes #28950
Conversation
@alexott it would be great if you can review the PR and suggest feedback. Thank you for your time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your contribution!
I would make it the DatabricksSqlSensor
really generic by allowing to pass an arbitrary SQL expression that will trigger sensor if it has non-empty result. And then on top of it we can build Partition & History change sensors.
Also we need:
- Documentation
- Sensor should be declared in the
airflow/providers/databricks/provider.yaml
""" | ||
Generic Databricks SQL sensor. | ||
|
||
:param databricks_conn_id:str=DatabricksSqlHook.default_conn_name: Specify the name of the connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need include type to the docstrings, just use :param {argname}: {description}
, all information from about expected types get from annotations.
Include this may cause issues when documentation will generated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the type. Thanks for the feedback.
|
||
Args: | ||
context (Context): Airflow context | ||
lookup_key (_type_): Unique lookup key used to store values related to a specific table. | ||
|
||
Returns: | ||
int: Version number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use Google style in docstring, please use reStructuredText
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it.
partition_name: dict = {"date": "2023-1-1"}, | ||
handler: Callable[[Any], Any] = fetch_all_handler, | ||
db_sensor_type: str, | ||
timestamp: datetime = datetime.now() - timedelta(days=7), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do not need any default value for timestamp
.
If it mandatory field, just make it mandatory and user should provide actual value here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I thought it needs to have some default value is to extract history for a time period irrespective of a custom value provided by the user, for example - past 7 days.
Also, it makes sense to declare some of properties as templatised. For example, partitions mapping, etc. |
Also, for partition sensor it would make sense to allow to specify operations on the partitions, like, allow not only |
@alexott agree, that would be good to implement. Do you have an example to follow for this one? |
Yes, just look into DatabricksSqlOperator - it allows to templatize |
9b1b051
to
968e8d1
Compare
73953ce
to
0030fc4
Compare
@alexott @Taragolis Thank you for taking the time to review the PR and for giving valuable feedback. I have addressed all of the comments. It would be great if you can review the changes. Thank you again! |
6a9e11b
to
7781bd7
Compare
- integration-name: Databricks SQL | ||
python-modules: | ||
- airflow.providers.databricks.sensors.databricks_sql | ||
- integration-name: Databricks Partition | ||
python-modules: | ||
- airflow.providers.databricks.sensors.databricks_partition | ||
- integration-name: Databricks Table Changes | ||
python-modules: | ||
- airflow.providers.databricks.sensors.databricks_table_changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can combine all of them under the Databricks SQL
umbrella
table_name: str = "", | ||
partition_name: dict, | ||
handler: Callable[[Any], Any] = fetch_one_handler, | ||
caller: str = "DatabricksPartitionSensor", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let hardcode caller
instead of passing it as an argument.
databricks_conn_id: str = DatabricksSqlHook.default_conn_name, | ||
http_path: str | None = None, | ||
sql_endpoint_name: str | None = None, | ||
session_configuration=None, | ||
http_headers: list[tuple[str, str]] | None = None, | ||
catalog: str = "", | ||
schema: str = "default", | ||
table_name: str = "", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we inherit most of the parameters from DatabricksSqlSensor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, changed it.
self.handler = handler | ||
self.partition_operator = partition_operator | ||
|
||
def _get_hook(self) -> DatabricksSqlHook: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we pass all parameters to the DatabricksSqlSensor
, then we can simply inherit that hook from it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, implemented the change.
return sql_result | ||
|
||
def _check_table_partitions(self) -> list: | ||
if self.catalog is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will be always executed this way because we're passing empty string as default. Also, we can continue to rely on two-level and even one level naming, relying on default catalog & default schema...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it.
if len(result) >= 1: | ||
return True | ||
else: | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really just return len(result) > 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it.
databricks_conn_id: str = DatabricksSqlHook.default_conn_name, | ||
http_path: str | None = None, | ||
sql_endpoint_name: str | None = None, | ||
session_configuration=None, | ||
http_headers: list[tuple[str, str]] | None = None, | ||
catalog: str = "", | ||
schema: str = "default", | ||
table_name: str = "", | ||
handler: Callable[[Any], Any] = fetch_all_handler, | ||
timestamp: datetime = datetime.now() - timedelta(days=7), | ||
caller: str = "DatabricksTableChangesSensor", | ||
client_parameters: dict[str, Any] | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comments as for partition sensor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it.
|
||
def get_current_table_version(self, table_name, time_range): | ||
change_sql = ( | ||
f"SELECT COUNT(version) as versions from " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about selecting max(version)
instead of count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, will change it.
change_sql = ( | ||
f"SELECT COUNT(version) as versions from " | ||
f"(DESCRIBE HISTORY {table_name}) " | ||
f"WHERE timestamp >= '{time_range}'" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if time_range doesn't change between calls, we may return true all the time. Why not compare with the latest version instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to report only data changes? Otherwise we'll get not necessary changes like after VACUUM
, OPTIMIZE
, ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am working on this, will push the changes soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added these changes in the recent push.
if self.catalog is not None: | ||
complete_table_name = str(self.catalog + "." + self.schema + "." + self.table_name) | ||
self.log.debug("Table name generated from arguments: %s", complete_table_name) | ||
else: | ||
raise AirflowException("Catalog name not specified, aborting query execution.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as for partition sensor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it.
@@ -17,6 +17,9 @@ | |||
# under the License. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexott FYI: The Escaper
class was not available on the main
branch. So, I added it as part of this PR to be used in our provider.
Also, made some minor changes to exception handling in the Escaper class because it was looking for some user defined Exception classes from pyhive
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, it should be a part of databricks-sql-connector
: https://github.com/databricks/databricks-sql-python/blob/main/src/databricks/sql/utils.py#L121
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do I use the Escaper class in databricks-sql-python within Airflow? Do I add it to setup.py to be installed when Airflow starts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the help, I imported it via from databricks.sql.utils import ParamEscaper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some minor changes required, but otherwise - looks good
:param _catalog: An optional initial catalog to use. Requires DBR version 9.0+ (templated) | ||
:param _schema: An optional initial schema to use. Requires DBR version 9.0+ (templated) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why starting with _
? It then doesn't match other operators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it and made it consistent with other operators.
output_list.append( | ||
f"""{partition_col}{self.partition_operator}{self.escaper.escape_item(partition_value)}""" | ||
) | ||
if isinstance(partition_value, (str, datetime.date)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about timestamps aka datetime.datetime
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it.
) | ||
return self._sql_sensor(partition_sql) | ||
|
||
def _get_results(self, context: Context) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need context here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need it, removed it.
Databricks connection's extra parameters. | ||
:param http_headers: An optional list of (k, v) pairs that will be set as HTTP headers on every request. | ||
:param client_parameters: Additional parameters internal to Databricks SQL Connector parameters. | ||
:param sql: SQL query to be executed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all parameters for __init__
are documented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
if len(result) < 1: | ||
raise AirflowException("Databricks SQL partition sensor failed.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this handles lack of results differently than the generic sensor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unified the handling across classes.
:param _catalog: An optional initial catalog to use. | ||
Requires DBR version 9.0+ (templated), defaults to "" | ||
:param _schema: An optional initial schema to use. | ||
Requires DBR version 9.0+ (templated), defaults to "default" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here - unify names with base operator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
def get_current_table_version(self, table_name, time_range, operator): | ||
_count_describe_literal = "SELECT MAX(version) AS versions FROM (DESCRIBE HISTORY" | ||
_filter_predicate_literal = ") WHERE timestamp" | ||
_operation_filter_literal = "AND operation NOT LIKE '%CONVERT%' AND operation NOT LIKE '%OPTIMIZE%' \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be simpler to do operation NOT IN ('CONVERT', 'OPTIMIZE', ...)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, need to add FSCK
. See full list here: https://docs.delta.io/latest/delta-utility.html#retrieve-delta-table-history
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, added FSCK
. Changed it to a NOT IN
filter. Additionally, interesting to note that (on running a few commands like) FSCK
, OPTIMIZE
are not recorded in the history of the Delta table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding VACUUM START
, ... let's try to use just single %
-> VACUUM%
to avoid for searching by substring
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, changed it.
def __init__( | ||
self, | ||
table_name: str, | ||
timestamp: datetime = datetime.now() - timedelta(days=7), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about making this optional? for example, I want to check changes without taking timestamp into account
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
@@ -17,6 +17,9 @@ | |||
# under the License. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, it should be a part of databricks-sql-connector
: https://github.com/databricks/databricks-sql-python/blob/main/src/databricks/sql/utils.py#L121
@alexott thank you for taking the time to review! I resolved all the comments (except the escaper class). |
d18d4a7
to
bf87dba
Compare
@harishkrao does this PR solves #21381 ? |
bf87dba
to
d5dd505
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment about returning false vs. throwing an exception when there is no results.
But primary request for changes is for adding missing pieces:
- We need documentation be added as well
- Documentation should include examples - add a sensor example to
tests/system/providers/databricks
- it will be used for integration tests
@alexott just pushed an example DAG file, similar to the ones for Operators. |
d73e516
to
656814e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently I missed stuff because I reviewed from my phone
I think the code requires some more work. I'm not familiar with Databricks but the sensors seems very complex and I wonder for all of them if logic shouldn't be in hook?
@o-nikolas @josh-fell i appreciate another eye here.
def get_previous_version(context: Context, lookup_key): | ||
return context["ti"].xcom_pull(key=lookup_key, include_prior_dates=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the xcom part.
Why the sensor push and pull from xcom on every poke?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We store metadata on the most recently queried version for that table. And we send/receive them using xcom
. On querying the current version from Databricks, we compare it with the one stored in the metadata and take an action accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe there is a guarantee that the most recent XCom will be pulled here. Behind the scenes XCom.get_many()
is called and just retrieves the first record. At the mercy of the metadatabase being used there.
Also, what happens in a mapped operator situation? If the XCom key is always the same, it seems possible this can pull an XCom key for an entirely different task since task_ids
and map_index
is not specified in the xcom_pull()
call.
Another question then would be what if the input args are the same (i.e. checking for changes in the same table) but a user simply updates the task_id
. Would this sensor yield a false positive that there was indeed a change?
I don't necessarily have answers to these questions on the top of my head, but some things to think about with using XComs in this way.
def _get_results_table_changes(self, context) -> bool: | ||
complete_table_name = str(self.catalog + "." + self.schema + "." + self.table_name) | ||
self.log.debug("Table name generated from arguments: %s", complete_table_name) | ||
|
||
prev_version = -1 | ||
if context is not None: | ||
lookup_key = complete_table_name | ||
prev_data = self.get_previous_version(lookup_key=lookup_key, context=context) | ||
self.log.debug("prev_data: %s, type=%s", str(prev_data), type(prev_data)) | ||
if isinstance(prev_data, int): | ||
prev_version = prev_data | ||
elif prev_data is not None: | ||
raise AirflowException("Incorrect type for previous XCom data: %s", type(prev_data)) | ||
version = self.get_current_table_version(table_name=complete_table_name) | ||
self.log.debug("Current version: %s", version) | ||
if version is None: | ||
return False | ||
if prev_version < version: | ||
result = True | ||
else: | ||
return False | ||
if prev_version != version: | ||
self.set_version(lookup_key=lookup_key, version=version, context=context) | ||
self.log.debug("Result: %s", result) | ||
return result | ||
return False | ||
|
||
def poke(self, context: Context) -> bool: | ||
return self._get_results_table_changes(context=context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very complicated.
Sensor should ask simple question. Most of the logic for the operations should be functions in the hook (so they can also be utalized for other sensor/custom sensor users can create)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexott can you please weigh in on the design decisions we made to arrive at this pattern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Some of these functions look like they could be handy if made generally available as part of a hook.
output_list.append( | ||
f"""{partition_col}{self.partition_operator}{self.escaper.escape_item(partition_value)}""" | ||
) | ||
# TODO: Check date types. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leftover?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can remove this.
from airflow.utils.context import Context | ||
|
||
|
||
class DatabricksSqlSensor(BaseSensorOperator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm missing something here.
If this sensor leverages DbApiHook why doesn't it subclass SqlSensor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can change it to inherit the SqlSensor
.
@alexott can you please elaborate on the reasoning for why we wrote the sensors with this design? |
) as dag: | ||
# [docs] | ||
connection_id = "databricks_default" | ||
sql_endpoint_name = "Starter Warehouse" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually add test code to setup the resource under test, or at least make it configurable (os env var etc) so that users can setup their own and supply the correct config to test against it.
from airflow.providers.databricks.sensors.sql import DatabricksSqlSensor | ||
from airflow.providers.databricks.sensors.table_changes import DatabricksTableChangesSensor | ||
|
||
# [docs] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of these tags?
self.log.debug("prev_data: %s, type=%s", str(prev_data), type(prev_data)) | ||
if isinstance(prev_data, int): | ||
prev_version = prev_data | ||
elif prev_data is not None: | ||
raise AirflowException("Incorrect type for previous XCom data: %s", type(prev_data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO all this logic should be inside get_previous_version()
rather than here.
version = self.get_current_table_version(table_name=complete_table_name) | ||
self.log.debug("Current version: %s", version) | ||
if version is None: | ||
return False | ||
if prev_version < version: | ||
result = True | ||
else: | ||
return False | ||
if prev_version != version: | ||
self.set_version(lookup_key=lookup_key, version=version, context=context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I fully understand this logic. The two False cases can certainly be collapsed to be more compact, but also, shouldn't the false case be setting result
rather than returning? If they return then the code to store the version in xcom is not executed. You're basically always comparing version
with the prev_version
default value of -1
from what I can tell.
self.log.debug("Table name generated from arguments: %s", complete_table_name) | ||
|
||
prev_version = -1 | ||
if context is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we really worried about context
being missing?
If so, then just add a statement like:
if not context:
return False
This way the whole main block of code doesn't have to be indented.
Also if context is really missing you may want to throw an exception instead of just returning False.
def set_version(context: Context, lookup_key, version): | ||
context["ti"].xcom_push(key=lookup_key, value=version) | ||
|
||
def get_current_table_version(self, table_name): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can any or all of this be pushed into the hook? Validating things like operators doesn't seem like the right thing
if len(partition_columns) < 1: | ||
raise AirflowException("Table %s does not have partitions", table_name) | ||
formatted_opts = "" | ||
if opts is not None and len(opts) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if opts is not None and len(opts) > 0: | |
if opts: |
from airflow.providers.common.sql.hooks.sql import fetch_all_handler | ||
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook | ||
from airflow.sensors.base import BaseSensorOperator | ||
from airflow.utils.context import Context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this import is only used for typing it should be put behind typing.TYPE_CHECKING
. One fewer import at runtime. Applicable to all of the other net-new modules in this PR too.
if len(result) < 1: | ||
return False | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if len(result) < 1: | |
return False | |
return True | |
return bool(result) |
Small optimization.
"""Sensor to execute SQL statements on a Delta table via Databricks. | ||
|
||
:param databricks_conn_id: Reference to :ref:`Databricks | ||
connection id<howto/connection:databricks>` (templated), defaults to | ||
DatabricksSqlHook.default_conn_name | ||
:param http_path: Optional string specifying HTTP path of Databricks SQL Endpoint or cluster. | ||
If not specified, it should be either specified in the Databricks connection's | ||
extra parameters, or ``sql_endpoint_name`` must be specified. | ||
:param sql_endpoint_name: Optional name of Databricks SQL Endpoint. If not specified, ``http_path`` | ||
must be provided as described above, defaults to None | ||
:param session_configuration: An optional dictionary of Spark session parameters. If not specified, | ||
it could be specified in the Databricks connection's extra parameters., defaults to None | ||
:param http_headers: An optional list of (k, v) pairs | ||
that will be set as HTTP headers on every request. (templated). | ||
:param catalog: An optional initial catalog to use. | ||
Requires DBR version 9.0+ (templated), defaults to "" | ||
:param schema: An optional initial schema to use. | ||
Requires DBR version 9.0+ (templated), defaults to "default" | ||
:param sql: SQL statement to be executed. | ||
:param handler: Handler for DbApiHook.run() to return results, defaults to fetch_all_handler | ||
:param client_parameters: Additional parameters internal to Databricks SQL Connector parameters. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two sets of docstrings for this sensor's construction. Can you consolidate please?
def _get_results_table_changes(self, context) -> bool: | ||
complete_table_name = str(self.catalog + "." + self.schema + "." + self.table_name) | ||
self.log.debug("Table name generated from arguments: %s", complete_table_name) | ||
|
||
prev_version = -1 | ||
if context is not None: | ||
lookup_key = complete_table_name | ||
prev_data = self.get_previous_version(lookup_key=lookup_key, context=context) | ||
self.log.debug("prev_data: %s, type=%s", str(prev_data), type(prev_data)) | ||
if isinstance(prev_data, int): | ||
prev_version = prev_data | ||
elif prev_data is not None: | ||
raise AirflowException("Incorrect type for previous XCom data: %s", type(prev_data)) | ||
version = self.get_current_table_version(table_name=complete_table_name) | ||
self.log.debug("Current version: %s", version) | ||
if version is None: | ||
return False | ||
if prev_version < version: | ||
result = True | ||
else: | ||
return False | ||
if prev_version != version: | ||
self.set_version(lookup_key=lookup_key, version=version, context=context) | ||
self.log.debug("Result: %s", result) | ||
return result | ||
return False | ||
|
||
def poke(self, context: Context) -> bool: | ||
return self._get_results_table_changes(context=context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Some of these functions look like they could be handy if made generally available as part of a hook.
defaults to >=. | ||
""" | ||
|
||
template_fields: Sequence[str] = ("databricks_conn_id", "catalog", "schema", "table_name") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it would be useful to have timestamp
as a template field too. I could foresee users wanting to use one of the built-in Jinja variables for this to the task is idempotent (like {{ data_interval_start }}
for example). Or, be used as a dynamic input from a previous task.
def _sql_sensor(self, sql): | ||
hook = self._get_hook() | ||
sql_result = hook.run( | ||
sql, | ||
handler=self.handler if self.do_xcom_push else None, | ||
) | ||
return sql_result | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def _sql_sensor(self, sql): | |
hook = self._get_hook() | |
sql_result = hook.run( | |
sql, | |
handler=self.handler if self.do_xcom_push else None, | |
) | |
return sql_result |
Same idea here. This method exists in DatabricksSqlSensor.
def poke(self, context: Context) -> bool: | ||
return self._get_results() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically could remove this too.
def get_previous_version(context: Context, lookup_key): | ||
return context["ti"].xcom_pull(key=lookup_key, include_prior_dates=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe there is a guarantee that the most recent XCom will be pulled here. Behind the scenes XCom.get_many()
is called and just retrieves the first record. At the mercy of the metadatabase being used there.
Also, what happens in a mapped operator situation? If the XCom key is always the same, it seems possible this can pull an XCom key for an entirely different task since task_ids
and map_index
is not specified in the xcom_pull()
call.
Another question then would be what if the input args are the same (i.e. checking for changes in the same table) but a user simply updates the task_id
. Would this sensor yield a false positive that there was indeed a change?
I don't necessarily have answers to these questions on the top of my head, but some things to think about with using XComs in this way.
) | ||
|
||
|
||
class TestDatabricksPartitionSensor(unittest.TestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an ongoing effort to move away from unittest
in favor of pytest
. Since these tests are net-new, could you change this and the other tests in the PR to pytest
please?
connection_id = "databricks_default" | ||
sql_endpoint_name = "Starter Warehouse" | ||
|
||
# [START howto_sensor_databricks_sql] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These START/END
markers are used to include code snippets in guides (generally). It would be great if there was accompanying documentation for these new sensors and ones that take advantage of the snippets being outlined in this DAG. There are a lot of examples in operator guides on how this is done throughout the providers.
partition_columns = self._sql_sensor(f"DESCRIBE DETAIL {table_name}")[0][7] | ||
self.log.info("table_info: %s", partition_columns) | ||
if len(partition_columns) < 1: | ||
raise AirflowException("Table %s does not have partitions", table_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raise AirflowException("Table %s does not have partitions", table_name) | |
raise AirflowException(f"Table {table_name} does not have partitions") |
Otherwise the message logged will not be what you expect.
# TODO: Check date types. | ||
else: | ||
raise AirflowException( | ||
"Column %s not part of table partitions: %s", partition_col, partition_columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Column %s not part of table partitions: %s", partition_col, partition_columns | |
f"Column {partition_col} not part of table partitions: {partition_columns}" |
Same here.
@eladkal @o-nikolas @josh-fell @alexott thanks for taking the time to review my code and provide feedback. I appreciate it. To incorporate the changes for the 3 sensors, I will break these down into 3 separate PRs so that it is easy to manage and test them individually rather than a bulk of changes in a single PR. |
Closes: #21381
Sensors for Databricks SQL to detect table partitions and new table events.