Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MetastoreHivePartitionSensor failing due to duplicate aliases #45001

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,16 @@ def get_dataproc_metastore_client_v1beta(self):

def wait_for_operation(self, timeout: float | None, operation: Operation):
"""Wait for long-lasting operation to complete."""
self.log.info("Waiting for operation (timeout: %s seconds)", timeout)

try:
return operation.result(timeout=timeout)
except Exception:
result = operation.result(timeout=timeout)
self.log.info("Operation completed successfully")
return result
except Exception as e:
self.log.error("Operation failed: %s", str(e))
error = operation.exception(timeout=timeout)
raise AirflowException(error)
raise AirflowException(f"Operation failed: {error}")

@GoogleBaseHook.fallback_to_default_project_id
def create_backup(
Expand Down Expand Up @@ -669,23 +674,37 @@ def list_hive_partitions(
# because dictionaries are ordered since Python 3.7+
_partitions = list(dict.fromkeys(partition_names)) if partition_names else []

query = f"""
SELECT *
FROM PARTITIONS
INNER JOIN TBLS
ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE
TBLS.TBL_NAME = '{table}'"""
if _partitions:
query += f"""
AND PARTITIONS.PART_NAME IN ({', '.join(f"'{p}'" for p in _partitions)})"""
query += ";"

client = self.get_dataproc_metastore_client_v1beta()
result = client.query_metadata(
request={
"service": f"projects/{project_id}/locations/{region}/services/{service_id}",
"query": query,
}
)
return result
partition_list = ", ".join(f"'{p}'" for p in _partitions)
query = f"""
SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
FROM PARTITIONS
INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE TBLS.TBL_NAME = '{table}'
AND PARTITIONS.PART_NAME IN ({partition_list});"""
else:
query = f"""
SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
FROM PARTITIONS
INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE TBLS.TBL_NAME = '{table}';"""

request = {
"service": f"projects/{project_id}/locations/{region}/services/{service_id}",
"query": query,
}

self.log.info("Prepared request:")
self.log.info(request)

# Execute query
try:
self.log.info("Getting Dataproc Metastore client (v1beta)...")
client = self.get_dataproc_metastore_client_v1beta()
self.log.info("Executing query_metadata...")
result = client.query_metadata(request=request)
self.log.info("Query executed successfully")
return result
except Exception as e:
self.log.error("Error executing query_metadata: %s", str(e))
raise
23 changes: 10 additions & 13 deletions providers/tests/google/cloud/hooks/test_dataproc_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,17 @@
TEST_PARTITION_NAME = "column=value"
TEST_SUBPARTITION_NAME = "column1=value1/column2=value2"
TEST_PARTITIONS_QUERY_ALL = """
SELECT *
FROM PARTITIONS
INNER JOIN TBLS
ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE
TBLS.TBL_NAME = '{}';"""
SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
FROM PARTITIONS
INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE TBLS.TBL_NAME = '{}';"""

TEST_PARTITIONS_QUERY = """
SELECT *
FROM PARTITIONS
INNER JOIN TBLS
ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE
TBLS.TBL_NAME = '{}'
AND PARTITIONS.PART_NAME IN ({});"""
SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
FROM PARTITIONS
INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE TBLS.TBL_NAME = '{}'
AND PARTITIONS.PART_NAME IN ({});"""
BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
DATAPROC_METASTORE_STRING = "airflow.providers.google.cloud.hooks.dataproc_metastore.{}"

Expand Down