diff --git a/docs/apache-airflow-providers-google/operators/cloud/translate.rst b/docs/apache-airflow-providers-google/operators/cloud/translate.rst index 5bda3d9085a6c..4b1cee34617b5 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/translate.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/translate.rst @@ -247,6 +247,48 @@ Basic usage of the operator: :end-before: [END howto_operator_translate_automl_delete_model] +.. _howto/operator:TranslateDocumentOperator: + +TranslateDocumentOperator +^^^^^^^^^^^^^^^^^^^^^^^^^ +Translate Document using Cloud Translate API (Advanced V3). + +For parameter definition, take a look at +:class:`~airflow.providers.google.cloud.operators.translate.TranslateDocumentOperator` + +Using the operator +"""""""""""""""""" + +Basic usage of the operator: + +.. exampleinclude:: /../../providers/tests/system/google/cloud/translate/example_translate_document.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_translate_document] + :end-before: [END howto_operator_translate_document] + + +.. _howto/operator:TranslateDocumentBatchOperator: + +TranslateDocumentBatchOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Translate Documents using Cloud Translate API (Advanced V3), by given input configs. + +For parameter definition, take a look at +:class:`~airflow.providers.google.cloud.operators.translate.TranslateDocumentBatchOperator` + +Using the operator +"""""""""""""""""" + +Basic usage of the operator: + +.. exampleinclude:: /../../providers/tests/system/google/cloud/translate/example_translate_document.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_translate_document_batch] + :end-before: [END howto_operator_translate_document_batch] + + More information """""""""""""""""" See: diff --git a/providers/src/airflow/providers/google/cloud/hooks/translate.py b/providers/src/airflow/providers/google/cloud/hooks/translate.py index 43e0c15774bb1..cf9a748d1a2ae 100644 --- a/providers/src/airflow/providers/google/cloud/hooks/translate.py +++ b/providers/src/airflow/providers/google/cloud/hooks/translate.py @@ -39,9 +39,14 @@ from google.api_core.operation import Operation from google.cloud.translate_v3.services.translation_service import pagers from google.cloud.translate_v3.types import ( + BatchDocumentInputConfig, + BatchDocumentOutputConfig, DatasetInputConfig, + DocumentInputConfig, + DocumentOutputConfig, InputConfig, OutputConfig, + TranslateDocumentResponse, TranslateTextGlossaryConfig, TransliterationConfig, automl_translation, @@ -714,3 +719,199 @@ def delete_model( metadata=metadata, ) return result + + def translate_document( + self, + *, + project_id: str = PROVIDE_PROJECT_ID, + source_language_code: str | None = None, + target_language_code: str, + location: str | None = None, + document_input_config: DocumentInputConfig | dict, + document_output_config: DocumentOutputConfig | dict | None, + customized_attribution: str | None = None, + is_translate_native_pdf_only: bool = False, + enable_shadow_removal_native_pdf: bool = False, + enable_rotation_correction: bool = False, + model: str | None = None, + glossary_config: TranslateTextGlossaryConfig | None = None, + labels: str | None = None, + timeout: float | _MethodDefault = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + retry: Retry | _MethodDefault | None = DEFAULT, + ) -> TranslateDocumentResponse: + """ + Translate the document provided. + + :param project_id: Required. The ID of the Google Cloud project that the service belongs to. + :param source_language_code: Optional. The ISO-639 language code of the + input document text if known. If the source language isn't specified, + the API attempts to identify the source language automatically and returns + the source language within the response. + :param target_language_code: Required. The ISO-639 language code to use + for translation of the input document text. + :param location: Optional. Project or location to make a call. Must refer to + a caller's project. + If not specified, 'global' is used. + Non-global location is required for requests using AutoML models or custom glossaries. + Models and glossaries must be within the same region (have the same location-id). + :param document_input_config: A document translation request input config. + :param document_output_config: Optional. A document translation request output config. + If not provided the translated file will only be returned through a byte-stream + and its output mime type will be the same as the input file's mime type. + :param customized_attribution: Optional. This flag is to support user customized + attribution. If not provided, the default is ``Machine Translated by Google``. + Customized attribution should follow rules in + https://cloud.google.com/translate/attribution#attribution_and_logos + :param is_translate_native_pdf_only: Optional. Param for external + customers. If true, the page limit of online native PDF + translation is 300 and only native PDF pages will be + translated. + :param enable_shadow_removal_native_pdf: Optional. If true, use the text removal server to remove the + shadow text on background image for native PDF translation. + Shadow removal feature can only be enabled when both ``is_translate_native_pdf_only``, + ``pdf_native_only`` are False. + :param enable_rotation_correction: Optional. If true, enable auto rotation + correction in DVS. + :param model: Optional. The ``model`` type requested for this translation. + If not provided, the default Google model (NMT) will be used. + The format depends on model type: + + - AutoML Translation models: + ``projects/{project-number-or-id}/locations/{location-id}/models/{model-id}`` + - General (built-in) models: + ``projects/{project-number-or-id}/locations/{location-id}/models/general/nmt``, + + If not provided, the default Google model (NMT) will be used + for translation. + :param glossary_config: Optional. Glossary to be applied. The glossary must be + within the same region (have the same location-id) as the + model. + :param labels: Optional. The labels with user-defined + metadata for the request. + See https://cloud.google.com/translate/docs/advanced/labels for more information. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + + :return: Translate document result from the API response. + """ + client = self.get_client() + location_id = "global" if not location else location + parent = f"projects/{project_id or self.project_id}/locations/{location_id}" + return client.translate_document( + request={ + "parent": parent, + "source_language_code": source_language_code, + "target_language_code": target_language_code, + "document_input_config": document_input_config, + "document_output_config": document_output_config, + "customized_attribution": customized_attribution, + "is_translate_native_pdf_only": is_translate_native_pdf_only, + "enable_shadow_removal_native_pdf": enable_shadow_removal_native_pdf, + "enable_rotation_correction": enable_rotation_correction, + "model": model, + "glossary_config": glossary_config, + "labels": labels, + }, + timeout=timeout, + retry=retry, + metadata=metadata, + ) + + def batch_translate_document( + self, + *, + project_id: str = PROVIDE_PROJECT_ID, + source_language_code: str, + target_language_codes: MutableSequence[str] | None = None, + location: str | None = None, + input_configs: MutableSequence[BatchDocumentInputConfig | dict], + output_config: BatchDocumentOutputConfig | dict, + customized_attribution: str | None = None, + format_conversions: MutableMapping[str, str] | None = None, + enable_shadow_removal_native_pdf: bool = False, + enable_rotation_correction: bool = False, + models: MutableMapping[str, str] | None = None, + glossaries: MutableMapping[str, TranslateTextGlossaryConfig] | None = None, + timeout: float | _MethodDefault = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + retry: Retry | _MethodDefault | None = DEFAULT, + ) -> Operation: + """ + Translate documents batch by configs provided. + + :param project_id: Required. The ID of the Google Cloud project that the service belongs to. + :param source_language_code: Optional. The ISO-639 language code of the + input text if known. If the source language isn't specified, the API attempts to identify + the source language automatically and returns the source language within the response. + :param target_language_codes: Required. The ISO-639 language code to use + for translation of the input document. Specify up to 10 language codes here. + :param location: Optional. Project or location to make a call. Must refer to + a caller's project. If not specified, 'global' is used. + Non-global location is required for requests using AutoML models or custom glossaries. + Models and glossaries must be within the same region (have the same location-id). + :param input_configs: Input configurations. The total number of files matched should be <= + 100. The total content size to translate should be <= 100M Unicode codepoints. + The files must use UTF-8 encoding. + :param output_config: Output configuration. If 2 input configs match to the same file (that + is, same input path), no output for duplicate inputs will be generated. + :param format_conversions: Optional. The file format conversion map that is applied to + all input files. The map key is the original mime_type. + The map value is the target mime_type of translated documents. + Supported file format conversion includes: + + - ``application/pdf`` to + ``application/vnd.openxmlformats-officedocument.wordprocessingml.document`` + + If nothing specified, output files will be in the same format as the original file. + :param customized_attribution: Optional. This flag is to support user customized + attribution. If not provided, the default is ``Machine Translated by Google``. + Customized attribution should follow rules in + https://cloud.google.com/translate/attribution#attribution_and_logos + :param enable_shadow_removal_native_pdf: Optional. If true, use the text removal server to remove the + shadow text on background image for native PDF translation. + Shadow removal feature can only be enabled when both ``is_translate_native_pdf_only``, + ``pdf_native_only`` are False. + :param enable_rotation_correction: Optional. If true, enable auto rotation + correction in DVS. + :param models: Optional. The models to use for translation. Map's key is + target language code. Map's value is the model name. Value + can be a built-in general model, or an AutoML Translation model. + The value format depends on model type: + + - AutoML Translation models: + ``projects/{project-number-or-id}/locations/{location-id}/models/{model-id}`` + - General (built-in) models: + ``projects/{project-number-or-id}/locations/{location-id}/models/general/nmt``, + + If the map is empty or a specific model is not requested for + a language pair, then default google model (NMT) is used. + :param glossaries: Glossaries to be applied. It's keyed by target language code. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + + :return: Batch translate document result from the API response. + """ + client = self.get_client() + location_id = "global" if not location else location + parent = f"projects/{project_id or self.project_id}/locations/{location_id}" + return client.batch_translate_document( + request={ + "parent": parent, + "source_language_code": source_language_code, + "target_language_codes": target_language_codes, + "input_configs": input_configs, + "output_config": output_config, + "format_conversions": format_conversions, + "customized_attribution": customized_attribution, + "enable_shadow_removal_native_pdf": enable_shadow_removal_native_pdf, + "enable_rotation_correction": enable_rotation_correction, + "models": models, + "glossaries": glossaries, + }, + timeout=timeout, + retry=retry, + metadata=metadata, + ) diff --git a/providers/src/airflow/providers/google/cloud/links/translate.py b/providers/src/airflow/providers/google/cloud/links/translate.py index 55db26508388d..ecf595e9a59c1 100644 --- a/providers/src/airflow/providers/google/cloud/links/translate.py +++ b/providers/src/airflow/providers/google/cloud/links/translate.py @@ -333,3 +333,38 @@ def persist( "project_id": project_id, }, ) + + +class TranslateResultByOutputConfigLink(BaseGoogleLink): + """ + Helper class for constructing Translation results Link. + + Provides link to gcs destination output translation results, by provided output_config + with gcs destination specified. + """ + + name = "Translate Results By Output Config" + key = "translate_results_by_output_config" + format_str = TRANSLATION_TRANSLATE_TEXT_BATCH + + @staticmethod + def extract_output_uri_prefix(output_config): + return output_config["gcs_destination"]["output_uri_prefix"].rpartition("gs://")[-1] + + @staticmethod + def persist( + context: Context, + task_instance, + project_id: str, + output_config: dict, + ): + task_instance.xcom_push( + context, + key=TranslateResultByOutputConfigLink.key, + value={ + "project_id": project_id, + "output_uri_prefix": TranslateResultByOutputConfigLink.extract_output_uri_prefix( + output_config + ), + }, + ) diff --git a/providers/src/airflow/providers/google/cloud/operators/translate.py b/providers/src/airflow/providers/google/cloud/operators/translate.py index 4c04e9a7bc58c..e57b9e46fccae 100644 --- a/providers/src/airflow/providers/google/cloud/operators/translate.py +++ b/providers/src/airflow/providers/google/cloud/operators/translate.py @@ -20,7 +20,7 @@ from __future__ import annotations from collections.abc import MutableMapping, MutableSequence, Sequence -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast from google.api_core.exceptions import GoogleAPICallError from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault @@ -28,6 +28,7 @@ from airflow.exceptions import AirflowException from airflow.providers.google.cloud.hooks.translate import CloudTranslateHook, TranslateHook from airflow.providers.google.cloud.links.translate import ( + TranslateResultByOutputConfigLink, TranslateTextBatchLink, TranslationDatasetsListLink, TranslationModelLink, @@ -40,7 +41,11 @@ if TYPE_CHECKING: from google.api_core.retry import Retry from google.cloud.translate_v3.types import ( + BatchDocumentInputConfig, + BatchDocumentOutputConfig, DatasetInputConfig, + DocumentInputConfig, + DocumentOutputConfig, InputConfig, OutputConfig, TranslateTextGlossaryConfig, @@ -978,3 +983,328 @@ def execute(self, context: Context): ) hook.wait_for_operation_done(operation=operation, timeout=self.timeout) self.log.info("Model deletion complete!") + + +class TranslateDocumentOperator(GoogleCloudBaseOperator): + """ + Translate document provided. + + Wraps the Google cloud Translate Text (Advanced) functionality. + Supports wide range of input/output file types, please visit the + https://cloud.google.com/translate/docs/advanced/translate-documents for more details. + + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:TranslateDocumentOperator`. + + :param project_id: Optional. The ID of the Google Cloud project that the + service belongs to. If not specified the hook project_id will be used. + :param source_language_code: Optional. The ISO-639 language code of the + input document text if known. If the source language isn't specified, + the API attempts to identify the source language automatically and returns + the source language within the response. + :param target_language_code: Required. The ISO-639 language code to use + for translation of the input document text. + :param location: Optional. Project or location to make a call. Must refer to a caller's project. + If not specified, 'global' is used. + Non-global location is required for requests using AutoML models or custom glossaries. + Models and glossaries must be within the same region (have the same location-id). + :param document_input_config: A document translation request input config. + :param document_output_config: Optional. A document translation request output config. + If not provided the translated file will only be returned through a byte-stream + and its output mime type will be the same as the input file's mime type. + :param customized_attribution: Optional. This flag is to support user customized + attribution. If not provided, the default is ``Machine Translated by Google``. + Customized attribution should follow rules in + https://cloud.google.com/translate/attribution#attribution_and_logos + :param is_translate_native_pdf_only: Optional. Param for external customers. + If true, the page limit of online native PDF translation is 300 and only native PDF pages + will be translated. + :param enable_shadow_removal_native_pdf: Optional. If true, use the text removal server to remove the + shadow text on background image for native PDF translation. + Shadow removal feature can only be enabled when both ``is_translate_native_pdf_only``, + ``pdf_native_only`` are False. + :param enable_rotation_correction: Optional. If true, enable auto rotation + correction in DVS. + :param model: Optional. The ``model`` type requested for this translation. + If not provided, the default Google model (NMT) will be used. + The format depends on model type: + + - AutoML Translation models: + ``projects/{project-number-or-id}/locations/{location-id}/models/{model-id}`` + - General (built-in) models: + ``projects/{project-number-or-id}/locations/{location-id}/models/general/nmt`` + + If not provided, the default Google model (NMT) will be used + for translation. + :param glossary_config: Optional. Glossary to be applied. + :param transliteration_config: Optional. Transliteration to be applied. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use connecting to Google Cloud. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + operator_extra_links = (TranslateResultByOutputConfigLink(),) + + template_fields: Sequence[str] = ( + "source_language_code", + "target_language_code", + "document_input_config", + "document_output_config", + "model", + "gcp_conn_id", + "impersonation_chain", + ) + + def __init__( + self, + *, + location: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, + source_language_code: str | None = None, + target_language_code: str, + document_input_config: DocumentInputConfig | dict, + document_output_config: DocumentOutputConfig | dict | None, + customized_attribution: str | None = None, + is_translate_native_pdf_only: bool = False, + enable_shadow_removal_native_pdf: bool = False, + enable_rotation_correction: bool = False, + model: str | None = None, + glossary_config: TranslateTextGlossaryConfig | None = None, + labels: str | None = None, + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.project_id = project_id + self.source_language_code = source_language_code + self.target_language_code = target_language_code + self.document_input_config = document_input_config + self.document_output_config = document_output_config + self.customized_attribution = customized_attribution + self.is_translate_native_pdf_only = is_translate_native_pdf_only + self.enable_shadow_removal_native_pdf = enable_shadow_removal_native_pdf + self.enable_rotation_correction = enable_rotation_correction + self.location = location + self.labels = labels + self.model = model + self.glossary_config = glossary_config + self.metadate = metadata + self.timeout = timeout + self.retry = retry + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> dict: + hook = TranslateHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + try: + self.log.info("Starting the document translation") + doc_translation_result = hook.translate_document( + source_language_code=self.source_language_code, + target_language_code=self.target_language_code, + document_input_config=self.document_input_config, + document_output_config=self.document_output_config, + customized_attribution=self.customized_attribution, + is_translate_native_pdf_only=self.is_translate_native_pdf_only, + enable_shadow_removal_native_pdf=self.enable_shadow_removal_native_pdf, + enable_rotation_correction=self.enable_rotation_correction, + location=self.location, + labels=self.labels, + model=self.model, + glossary_config=self.glossary_config, + timeout=self.timeout, + retry=self.retry, + metadata=self.metadate, + ) + self.log.info("Document translation completed") + except GoogleAPICallError as e: + self.log.error("An error occurred executing translate_document method: \n%s", e) + raise AirflowException(e) + if self.document_output_config: + TranslateResultByOutputConfigLink.persist( + context=context, + task_instance=self, + project_id=self.project_id or hook.project_id, + output_config=self.document_output_config, + ) + return cast(dict, type(doc_translation_result).to_dict(doc_translation_result)) + + +class TranslateDocumentBatchOperator(GoogleCloudBaseOperator): + """ + Translate documents provided via input and output configurations. + + Up to 10 target languages per operation supported. + Wraps the Google cloud Translate Text (Advanced) functionality. + See https://cloud.google.com/translate/docs/advanced/batch-translation. + + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:TranslateDocumentBatchOperator`. + + :param project_id: Required. The ID of the Google Cloud project that the service belongs to. + :param source_language_code: Optional. The ISO-639 language code of the + input text if known. If the source language isn't specified, the API attempts to identify + the source language automatically and returns the source language within the response. + :param target_language_codes: Required. The ISO-639 language code to use + for translation of the input document. Specify up to 10 language codes here. + :param location: Optional. Project or location to make a call. Must refer to + a caller's project. If not specified, 'global' is used. + Non-global location is required for requests using AutoML models or custom glossaries. + Models and glossaries must be within the same region (have the same location-id). + :param input_configs: Input configurations. The total number of files matched should be <= + 100. The total content size to translate should be <= 100M Unicode codepoints. + The files must use UTF-8 encoding. + :param output_config: Output configuration. If 2 input configs match to the same file (that + is, same input path), no output for duplicate inputs will be generated. + :param format_conversions: Optional. The file format conversion map that is applied to + all input files. The map key is the original mime_type. + The map value is the target mime_type of translated documents. + Supported file format conversion includes: + + - ``application/pdf`` to + ``application/vnd.openxmlformats-officedocument.wordprocessingml.document`` + + If nothing specified, output files will be in the same format as the original file. + :param customized_attribution: Optional. This flag is to support user customized + attribution. If not provided, the default is ``Machine Translated by Google``. + Customized attribution should follow rules in + https://cloud.google.com/translate/attribution#attribution_and_logos + :param enable_shadow_removal_native_pdf: Optional. If true, use the text removal server to remove the + shadow text on background image for native PDF translation. + Shadow removal feature can only be enabled when both ``is_translate_native_pdf_only``, + ``pdf_native_only`` are False. + :param enable_rotation_correction: Optional. If true, enable auto rotation + correction in DVS. + :param models: Optional. The models to use for translation. Map's key is + target language code. Map's value is the model name. Value + can be a built-in general model, or an AutoML Translation model. + The value format depends on model type: + + - AutoML Translation models: + ``projects/{project-number-or-id}/locations/{location-id}/models/{model-id}`` + + - General (built-in) models: + ``projects/{project-number-or-id}/locations/{location-id}/models/general/nmt``, + + If the map is empty or a specific model is not requested for + a language pair, then default google model (NMT) is used. + :param glossaries: Glossaries to be applied. It's keyed by target language code. + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use connecting to Google Cloud. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + operator_extra_links = (TranslateResultByOutputConfigLink(),) + + template_fields: Sequence[str] = ( + "input_configs", + "output_config", + "target_language_codes", + "source_language_code", + "models", + "glossaries", + "gcp_conn_id", + "impersonation_chain", + ) + + def __init__( + self, + *, + project_id: str = PROVIDE_PROJECT_ID, + source_language_code: str, + target_language_codes: MutableSequence[str] | None = None, + location: str | None = None, + input_configs: MutableSequence[BatchDocumentInputConfig | dict], + output_config: BatchDocumentOutputConfig | dict, + customized_attribution: str | None = None, + format_conversions: MutableMapping[str, str] | None = None, + enable_shadow_removal_native_pdf: bool = False, + enable_rotation_correction: bool = False, + models: MutableMapping[str, str] | None = None, + glossaries: MutableMapping[str, TranslateTextGlossaryConfig] | None = None, + metadata: Sequence[tuple[str, str]] = (), + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.project_id = project_id + self.location = location + self.target_language_codes = target_language_codes + self.source_language_code = source_language_code + self.input_configs = input_configs + self.output_config = output_config + self.customized_attribution = customized_attribution + self.format_conversions = format_conversions + self.enable_shadow_removal_native_pdf = enable_shadow_removal_native_pdf + self.enable_rotation_correction = enable_rotation_correction + self.models = models + self.glossaries = glossaries + self.metadata = metadata + self.timeout = timeout + self.retry = retry + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> dict: + hook = TranslateHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + try: + batch_document_translate_operation = hook.batch_translate_document( + project_id=self.project_id, + location=self.location, + target_language_codes=self.target_language_codes, + source_language_code=self.source_language_code, + input_configs=self.input_configs, + output_config=self.output_config, + customized_attribution=self.customized_attribution, + format_conversions=self.format_conversions, + enable_shadow_removal_native_pdf=self.enable_shadow_removal_native_pdf, + enable_rotation_correction=self.enable_rotation_correction, + models=self.models, + glossaries=self.glossaries, + metadata=self.metadata, + timeout=self.timeout, + retry=self.retry, + ) + except GoogleAPICallError as e: + self.log.error("An error occurred executing batch_translate_document method: \n%s", e) + raise AirflowException(e) + self.log.info("Batch document translation job started.") + TranslateResultByOutputConfigLink.persist( + context=context, + task_instance=self, + project_id=self.project_id or hook.project_id, + output_config=self.output_config, + ) + result = hook.wait_for_operation_result(batch_document_translate_operation) + self.log.info("Batch document translation job finished") + return cast(dict, type(result).to_dict(result)) diff --git a/providers/src/airflow/providers/google/provider.yaml b/providers/src/airflow/providers/google/provider.yaml index 44eb99a43f6d9..cf3691ffbef40 100644 --- a/providers/src/airflow/providers/google/provider.yaml +++ b/providers/src/airflow/providers/google/provider.yaml @@ -1297,6 +1297,7 @@ extra-links: - airflow.providers.google.cloud.links.translate.TranslationDatasetsListLink - airflow.providers.google.cloud.links.translate.TranslationModelLink - airflow.providers.google.cloud.links.translate.TranslationModelsListLink + - airflow.providers.google.cloud.links.translate.TranslateResultByOutputConfigLink secrets-backends: diff --git a/providers/tests/google/cloud/operators/test_translate.py b/providers/tests/google/cloud/operators/test_translate.py index d1b6a9fa009f7..2e4217ee53a8e 100644 --- a/providers/tests/google/cloud/operators/test_translate.py +++ b/providers/tests/google/cloud/operators/test_translate.py @@ -20,7 +20,11 @@ from unittest import mock from google.api_core.gapic_v1.method import DEFAULT -from google.cloud.translate_v3.types import automl_translation +from google.cloud.translate_v3.types import ( + BatchTranslateDocumentResponse, + TranslateDocumentResponse, + automl_translation, +) from airflow.providers.google.cloud.hooks.translate import TranslateHook from airflow.providers.google.cloud.operators.translate import ( @@ -30,20 +34,21 @@ TranslateDatasetsListOperator, TranslateDeleteDatasetOperator, TranslateDeleteModelOperator, + TranslateDocumentBatchOperator, + TranslateDocumentOperator, TranslateImportDataOperator, TranslateModelsListOperator, TranslateTextBatchOperator, TranslateTextOperator, ) -from providers.tests.system.google.cloud.tasks.example_tasks import LOCATION - GCP_CONN_ID = "google_cloud_default" IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"] PROJECT_ID = "test-project-id" DATASET_ID = "sample_ds_id" MODEL_ID = "sample_model_id" TIMEOUT_VALUE = 30 +LOCATION = "location_id" class TestCloudTranslate: @@ -542,3 +547,164 @@ def test_minimal_green_path(self, mock_hook): metadata=(), ) wait_for_done.assert_called_once_with(operation=m_delete_method_result, timeout=TIMEOUT_VALUE) + + +class TestTranslateDocumentBatchOperator: + @mock.patch("airflow.providers.google.cloud.links.translate.TranslateResultByOutputConfigLink.persist") + @mock.patch("airflow.providers.google.cloud.operators.translate.TranslateHook") + def test_minimal_green_path(self, mock_hook, mock_link_persist): + input_config_item_1 = { + "gcs_source": {"input_uri": "gs://source_bucket_uri/sample_data_src_lang_1.txt"}, + } + input_config_item_2 = { + "gcs_source": {"input_uri": "gs://source_bucket_uri/sample_data_src_lang_2.txt"}, + } + SRC_LANG_CODE = "src_lang_code" + TARGET_LANG_CODES = ["target_lang_code1", "target_lang_code2"] + TIMEOUT = 30 + INPUT_CONFIGS = [input_config_item_1, input_config_item_2] + OUTPUT_CONFIG = {"gcs_destination": {"output_uri_prefix": "gs://source_bucket_uri/output/"}} + BATCH_DOC_TRANSLATION_RESULT = { + "submit_time": "2024-12-01T00:01:16Z", + "end_time": "2024-12-01T00:10:01Z", + "failed_characters": "0", + "failed_pages": "0", + "total_billable_characters": "0", + "total_billable_pages": "6", + "total_characters": "4240", + "total_pages": "6", + "translated_characters": "4240", + "translated_pages": "6", + } + sample_operation = mock.MagicMock() + sample_operation.result.return_value = BatchTranslateDocumentResponse(BATCH_DOC_TRANSLATION_RESULT) + + mock_hook.return_value.batch_translate_document.return_value = sample_operation + mock_hook.return_value.wait_for_operation_result.side_effect = lambda operation: operation.result() + + op = TranslateDocumentBatchOperator( + task_id="task_id_test", + project_id=PROJECT_ID, + source_language_code=SRC_LANG_CODE, + target_language_codes=TARGET_LANG_CODES, + location=LOCATION, + models=None, + glossaries=None, + input_configs=INPUT_CONFIGS, + output_config=OUTPUT_CONFIG, + customized_attribution=None, + format_conversions=None, + enable_shadow_removal_native_pdf=False, + enable_rotation_correction=False, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + metadata=(), + timeout=TIMEOUT, + retry=None, + ) + context = {"ti": mock.MagicMock()} + result = op.execute(context=context) + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + mock_hook.return_value.batch_translate_document.assert_called_once_with( + project_id=PROJECT_ID, + source_language_code=SRC_LANG_CODE, + target_language_codes=TARGET_LANG_CODES, + location=LOCATION, + input_configs=INPUT_CONFIGS, + output_config=OUTPUT_CONFIG, + customized_attribution=None, + format_conversions=None, + enable_shadow_removal_native_pdf=False, + enable_rotation_correction=False, + timeout=TIMEOUT, + models=None, + glossaries=None, + retry=None, + metadata=(), + ) + + assert result == BATCH_DOC_TRANSLATION_RESULT + mock_link_persist.assert_called_once_with( + context=context, + task_instance=op, + project_id=PROJECT_ID, + output_config=OUTPUT_CONFIG, + ) + + +class TestTranslateDocumentOperator: + @mock.patch("airflow.providers.google.cloud.links.translate.TranslateResultByOutputConfigLink.persist") + @mock.patch("airflow.providers.google.cloud.operators.translate.TranslateHook") + def test_minimal_green_path(self, mock_hook, mock_link_persist): + SRC_LANG_CODE = "src_lang_code" + TARGET_LANG_CODE = "target_lang_code1" + TIMEOUT = 30 + INPUT_CONFIG = {"gcs_source": {"input_uri": "gs://source_bucket_uri/sample_data_src_lang_1.txt"}} + OUTPUT_CONFIG = {"gcs_destination": {"output_uri_prefix": "gs://source_bucket_uri/output/"}} + DOC_TRANSLATION_RESULT = { + "document_translation": { + "byte_stream_outputs": ["c29tZV9kYXRh"], + "detected_language_code": "", + "mime_type": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + }, + "model": f"projects/{PROJECT_ID}/locations/us-central1/models/general/nmt", + } + + mock_hook.return_value.translate_document.return_value = TranslateDocumentResponse( + DOC_TRANSLATION_RESULT + ) + + op = TranslateDocumentOperator( + task_id="task_id_test", + project_id=PROJECT_ID, + source_language_code=SRC_LANG_CODE, + target_language_code=TARGET_LANG_CODE, + location=LOCATION, + model=None, + glossary_config=None, + labels=None, + document_input_config=INPUT_CONFIG, + document_output_config=OUTPUT_CONFIG, + customized_attribution=None, + is_translate_native_pdf_only=False, + enable_shadow_removal_native_pdf=False, + enable_rotation_correction=False, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + timeout=TIMEOUT, + retry=None, + ) + context = {"ti": mock.MagicMock()} + result = op.execute(context=context) + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + mock_hook.return_value.translate_document.assert_called_once_with( + source_language_code=SRC_LANG_CODE, + target_language_code=TARGET_LANG_CODE, + location=LOCATION, + model=None, + glossary_config=None, + labels=None, + document_input_config=INPUT_CONFIG, + document_output_config=OUTPUT_CONFIG, + customized_attribution=None, + is_translate_native_pdf_only=False, + enable_shadow_removal_native_pdf=False, + enable_rotation_correction=False, + timeout=TIMEOUT, + retry=None, + metadata=(), + ) + + assert result == DOC_TRANSLATION_RESULT + mock_link_persist.assert_called_once_with( + context=context, + task_instance=op, + project_id=PROJECT_ID, + output_config=OUTPUT_CONFIG, + ) diff --git a/providers/tests/system/google/cloud/translate/example_translate_document.py b/providers/tests/system/google/cloud/translate/example_translate_document.py new file mode 100644 index 0000000000000..05ed41e8c69b8 --- /dev/null +++ b/providers/tests/system/google/cloud/translate/example_translate_document.py @@ -0,0 +1,131 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that translates text in Google Cloud Translate using V3 API version +service in the Google Cloud. +""" + +from __future__ import annotations + +import os +from datetime import datetime + +from airflow.models.dag import DAG +from airflow.providers.google.cloud.operators.translate import ( + TranslateDocumentBatchOperator, + TranslateDocumentOperator, +) +from airflow.utils.trigger_rule import TriggerRule +from providers.src.airflow.providers.google.cloud.operators.gcs import ( + GCSCreateBucketOperator, + GCSDeleteBucketOperator, +) + +DAG_ID = "gcp_translate_document" +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") +REGION = "us-central1" +RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" +DATA_OUTPUT_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-") + +DOC_TRANSLATE_INPUT = { + "gcs_source": { + "input_uri": f"gs://{RESOURCE_DATA_BUCKET}/V3_translate/document_translate/translate_me_sample.xlsx" + }, +} +GCS_OUTPUT_DST = { + "gcs_destination": {"output_uri_prefix": f"gs://{DATA_OUTPUT_BUCKET_NAME}/doc_translate_output/"} +} +BATCH_DOC_INPUT_ITEM_1 = { + "gcs_source": { + "input_uri": f"gs://{RESOURCE_DATA_BUCKET}/V3_translate/batch_document_translate/batch_translate_doc_sample_1.docx" + } +} +BATCH_DOC_INPUT_ITEM_2 = { + "gcs_source": { + "input_uri": f"gs://{RESOURCE_DATA_BUCKET}/V3_translate/batch_document_translate/batch_translate_sample_2.pdf" + } +} +BATCH_OUTPUT_CONFIG = { + "gcs_destination": {"output_uri_prefix": f"gs://{DATA_OUTPUT_BUCKET_NAME}/batch_translate_docs_output/"} +} + + +with DAG( + DAG_ID, + schedule="@once", # Override to match your needs + start_date=datetime(2024, 1, 1), + catchup=False, + tags=["example", "document_translate", "document_translate_batch", "translate_V3"], +) as dag: + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", + bucket_name=DATA_OUTPUT_BUCKET_NAME, + storage_class="REGIONAL", + location=REGION, + ) + # [START howto_operator_translate_document] + translate_document = TranslateDocumentOperator( + task_id="translate_document_op", + project_id=PROJECT_ID, + location=REGION, + source_language_code="en", + target_language_code="uk", + document_input_config=DOC_TRANSLATE_INPUT, + document_output_config=GCS_OUTPUT_DST, + ) + # [END howto_operator_translate_document] + + # [START howto_operator_translate_document_batch] + translate_document_batch = TranslateDocumentBatchOperator( + task_id="batch_translate_document_op", + project_id=PROJECT_ID, + location=REGION, + source_language_code="en", + target_language_codes=["uk", "fr"], + input_configs=[BATCH_DOC_INPUT_ITEM_1, BATCH_DOC_INPUT_ITEM_2], + output_config=BATCH_OUTPUT_CONFIG, + ) + # [END howto_operator_translate_document_batch] + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", + bucket_name=DATA_OUTPUT_BUCKET_NAME, + trigger_rule=TriggerRule.ALL_DONE, + ) + + ( + # TEST SETUP + create_bucket + # TEST BODY + >> [translate_document, translate_document_batch] + # TEST TEARDOWN + >> delete_bucket + ) + + from tests_common.test_utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)