From 1bc46755a9bbcbe66dedf28f18235d8d3d650525 Mon Sep 17 00:00:00 2001 From: rajaths010494 Date: Fri, 6 Jan 2023 14:34:25 +0530 Subject: [PATCH 1/7] Add python_callable for decorators --- python-sdk/src/astro/lineage/__init__.py | 1 + .../src/astro/sql/operators/base_decorator.py | 24 ++++++++++++++++++- .../src/astro/sql/operators/dataframe.py | 23 ++++++++++++++++++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/python-sdk/src/astro/lineage/__init__.py b/python-sdk/src/astro/lineage/__init__.py index 4d1949b01..88d7a16f8 100644 --- a/python-sdk/src/astro/lineage/__init__.py +++ b/python-sdk/src/astro/lineage/__init__.py @@ -13,6 +13,7 @@ OutputStatisticsOutputDatasetFacet, SchemaDatasetFacet, SchemaField, + SourceCodeJobFacet, SqlJobFacet, ) from openlineage.client.run import Dataset as OpenlineageDataset diff --git a/python-sdk/src/astro/sql/operators/base_decorator.py b/python-sdk/src/astro/sql/operators/base_decorator.py index 4d6c13c66..f97629b13 100644 --- a/python-sdk/src/astro/sql/operators/base_decorator.py +++ b/python-sdk/src/astro/sql/operators/base_decorator.py @@ -1,7 +1,8 @@ from __future__ import annotations import inspect -from typing import Any, Sequence, cast +import os +from typing import Any, Callable, Sequence, cast import pandas as pd from airflow.decorators.base import DecoratedOperator @@ -206,6 +207,7 @@ def get_openlineage_facets_on_complete(self, task_instance): OutputStatisticsOutputDatasetFacet, SchemaDatasetFacet, SchemaField, + SourceCodeJobFacet, SqlJobFacet, ) @@ -264,11 +266,31 @@ def get_openlineage_facets_on_complete(self, task_instance): base_sql_query = task_instance.xcom_pull(task_ids=task_instance.task_id, key="base_sql_query") job_facets: dict[str, BaseFacet] = {"sql": SqlJobFacet(query=base_sql_query)} + collect_source = os.environ.get("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "True") + source_code = self.get_source_code(task_instance.task.python_callable) + if collect_source and source_code: + job_facets.update( + { + "sourceCode": SourceCodeJobFacet("python", source_code), + } + ) return OperatorLineage( inputs=input_dataset, outputs=output_dataset, run_facets=run_facets, job_facets=job_facets ) + def get_source_code(self, callable: Callable) -> str | None: + import inspect + + try: + return inspect.getsource(callable) + except TypeError: + # Trying to extract source code of builtin_function_or_method + return str(callable) + except OSError: + self.log.warning("Can't get source code facet of Operator {self.operator.task_id}") + return None + def load_op_arg_dataframes_into_sql(conn_id: str, op_args: tuple, target_table: BaseTable) -> tuple: """ diff --git a/python-sdk/src/astro/sql/operators/dataframe.py b/python-sdk/src/astro/sql/operators/dataframe.py index 5597d8488..538056c93 100644 --- a/python-sdk/src/astro/sql/operators/dataframe.py +++ b/python-sdk/src/astro/sql/operators/dataframe.py @@ -222,6 +222,8 @@ def get_openlineage_facets_on_complete(self, task_instance): # skipcq: PYL-W061 """ Collect the input, output, job and run facets for DataframeOperator """ + import os + from astro.lineage import ( BaseFacet, DataSourceDatasetFacet, @@ -230,6 +232,7 @@ def get_openlineage_facets_on_complete(self, task_instance): # skipcq: PYL-W061 OutputStatisticsOutputDatasetFacet, SchemaDatasetFacet, SchemaField, + SourceCodeJobFacet, ) output_dataset: list[OpenlineageDataset] = [] @@ -262,10 +265,30 @@ def get_openlineage_facets_on_complete(self, task_instance): # skipcq: PYL-W061 run_facets: dict[str, BaseFacet] = {} job_facets: dict[str, BaseFacet] = {} + collect_source = os.environ.get("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "True") + source_code = self.get_source_code(task_instance.task.python_callable) + if collect_source and source_code: + job_facets.update( + { + "sourceCode": SourceCodeJobFacet("python", source_code), + } + ) return OperatorLineage( inputs=[], outputs=output_dataset, run_facets=run_facets, job_facets=job_facets ) + def get_source_code(self, callable: Callable) -> str | None: + import inspect + + try: + return inspect.getsource(callable) + except TypeError: + # Trying to extract source code of builtin_function_or_method + return str(callable) + except OSError: + self.log.warning("Can't get source code facet of Operator {self.operator.task_id}") + return None + def dataframe( python_callable: Callable | None = None, From 9992e516d552ed525fb875d9a20d865f2d5e1e0d Mon Sep 17 00:00:00 2001 From: Pankaj Date: Fri, 6 Jan 2023 23:29:25 +0530 Subject: [PATCH 2/7] Fix deepsource --- python-sdk/src/astro/sql/operators/base_decorator.py | 11 +++++------ python-sdk/src/astro/sql/operators/dataframe.py | 11 +++++------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/python-sdk/src/astro/sql/operators/base_decorator.py b/python-sdk/src/astro/sql/operators/base_decorator.py index f97629b13..909294981 100644 --- a/python-sdk/src/astro/sql/operators/base_decorator.py +++ b/python-sdk/src/astro/sql/operators/base_decorator.py @@ -279,17 +279,16 @@ def get_openlineage_facets_on_complete(self, task_instance): inputs=input_dataset, outputs=output_dataset, run_facets=run_facets, job_facets=job_facets ) - def get_source_code(self, callable: Callable) -> str | None: - import inspect - + def get_source_code(self, py_callable: Callable) -> str | None: + """Return the source code for the lineage""" try: - return inspect.getsource(callable) + return inspect.getsource(py_callable) except TypeError: # Trying to extract source code of builtin_function_or_method - return str(callable) + return str(py_callable) except OSError: self.log.warning("Can't get source code facet of Operator {self.operator.task_id}") - return None + return None def load_op_arg_dataframes_into_sql(conn_id: str, op_args: tuple, target_table: BaseTable) -> tuple: diff --git a/python-sdk/src/astro/sql/operators/dataframe.py b/python-sdk/src/astro/sql/operators/dataframe.py index 538056c93..b39ef6ffc 100644 --- a/python-sdk/src/astro/sql/operators/dataframe.py +++ b/python-sdk/src/astro/sql/operators/dataframe.py @@ -277,17 +277,16 @@ def get_openlineage_facets_on_complete(self, task_instance): # skipcq: PYL-W061 inputs=[], outputs=output_dataset, run_facets=run_facets, job_facets=job_facets ) - def get_source_code(self, callable: Callable) -> str | None: - import inspect - + def get_source_code(self, py_callable: Callable) -> str | None: + """Return the source code for the lineage""" try: - return inspect.getsource(callable) + return inspect.getsource(py_callable) except TypeError: # Trying to extract source code of builtin_function_or_method - return str(callable) + return str(py_callable) except OSError: self.log.warning("Can't get source code facet of Operator {self.operator.task_id}") - return None + return None def dataframe( From 334a26d50c7660d316bac30c8234fa066b77d7be Mon Sep 17 00:00:00 2001 From: Pankaj Date: Sat, 7 Jan 2023 00:59:20 +0530 Subject: [PATCH 3/7] Add tests --- .../tests/sql/operators/test_base_decorator.py | 18 ++++++++++++++++++ .../tests/sql/operators/test_dataframe.py | 15 +++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/python-sdk/tests/sql/operators/test_base_decorator.py b/python-sdk/tests/sql/operators/test_base_decorator.py index 1459f0e78..da8e1c586 100644 --- a/python-sdk/tests/sql/operators/test_base_decorator.py +++ b/python-sdk/tests/sql/operators/test_base_decorator.py @@ -1,3 +1,7 @@ +from unittest import mock +import pytest + +from astro.sql import RawSQLOperator from astro.sql.operators.base_decorator import BaseSQLDecoratedOperator @@ -7,3 +11,17 @@ def test_base_sql_decorated_operator_template_fields_with_parameters(): as this required for taskflow to work if XCom args are being passed via parameters. """ assert "parameters" in BaseSQLDecoratedOperator.template_fields + + +@pytest.mark.parametrize( + "exception", [OSError("os error"), TypeError("type error")] +) +@mock.patch("astro.sql.operators.base_decorator.inspect.getsource", autospec=True) +def test_get_source_code_handle_exception(mock_getsource, exception): + """assert get_source_code not raise exception""" + mock_getsource.side_effect = exception + RawSQLOperator( + task_id="test", + sql="select * from 1", + python_callable=lambda: 1 + ).get_source_code(py_callable=None) diff --git a/python-sdk/tests/sql/operators/test_dataframe.py b/python-sdk/tests/sql/operators/test_dataframe.py index ef973c17c..a2525103d 100644 --- a/python-sdk/tests/sql/operators/test_dataframe.py +++ b/python-sdk/tests/sql/operators/test_dataframe.py @@ -13,6 +13,7 @@ from astro.airflow.datasets import DATASET_SUPPORT from astro.files import File from astro.table import Table +from astro.sql.operators.dataframe import DataframeOperator from ..operators import utils as test_utils @@ -235,3 +236,17 @@ def count_df(df: pandas.DataFrame): test_utils.run_dag(sample_dag) mock_serde.serialize.assert_not_called() mock_serde.deserialize.assert_not_called() + + +@pytest.mark.parametrize( + "exception", [OSError("os error"), TypeError("type error")] +) +@mock.patch("astro.sql.operators.base_decorator.inspect.getsource", autospec=True) +def test_get_source_code_handle_exception(mock_getsource, exception): + """assert get_source_code not raise exception""" + mock_getsource.side_effect = exception + DataframeOperator( + task_id="test", + sql="select * from 1", + python_callable=lambda: 1 + ).get_source_code(py_callable=None) From e7744908a16ff259b7c9e292afa35a971f294c8f Mon Sep 17 00:00:00 2001 From: Pankaj Date: Sat, 7 Jan 2023 01:11:31 +0530 Subject: [PATCH 4/7] Fix tests --- python-sdk/tests/sql/operators/test_dataframe.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python-sdk/tests/sql/operators/test_dataframe.py b/python-sdk/tests/sql/operators/test_dataframe.py index a2525103d..991091837 100644 --- a/python-sdk/tests/sql/operators/test_dataframe.py +++ b/python-sdk/tests/sql/operators/test_dataframe.py @@ -247,6 +247,5 @@ def test_get_source_code_handle_exception(mock_getsource, exception): mock_getsource.side_effect = exception DataframeOperator( task_id="test", - sql="select * from 1", python_callable=lambda: 1 ).get_source_code(py_callable=None) From 85ac23d7810374a6c4517082d0283882ecefe7d1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 6 Jan 2023 20:13:04 +0000 Subject: [PATCH 5/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../tests/sql/operators/test_base_decorator.py | 13 +++++-------- python-sdk/tests/sql/operators/test_dataframe.py | 11 +++-------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/python-sdk/tests/sql/operators/test_base_decorator.py b/python-sdk/tests/sql/operators/test_base_decorator.py index da8e1c586..1d7e56ba7 100644 --- a/python-sdk/tests/sql/operators/test_base_decorator.py +++ b/python-sdk/tests/sql/operators/test_base_decorator.py @@ -1,4 +1,5 @@ from unittest import mock + import pytest from astro.sql import RawSQLOperator @@ -13,15 +14,11 @@ def test_base_sql_decorated_operator_template_fields_with_parameters(): assert "parameters" in BaseSQLDecoratedOperator.template_fields -@pytest.mark.parametrize( - "exception", [OSError("os error"), TypeError("type error")] -) +@pytest.mark.parametrize("exception", [OSError("os error"), TypeError("type error")]) @mock.patch("astro.sql.operators.base_decorator.inspect.getsource", autospec=True) def test_get_source_code_handle_exception(mock_getsource, exception): """assert get_source_code not raise exception""" mock_getsource.side_effect = exception - RawSQLOperator( - task_id="test", - sql="select * from 1", - python_callable=lambda: 1 - ).get_source_code(py_callable=None) + RawSQLOperator(task_id="test", sql="select * from 1", python_callable=lambda: 1).get_source_code( + py_callable=None + ) diff --git a/python-sdk/tests/sql/operators/test_dataframe.py b/python-sdk/tests/sql/operators/test_dataframe.py index 991091837..722287252 100644 --- a/python-sdk/tests/sql/operators/test_dataframe.py +++ b/python-sdk/tests/sql/operators/test_dataframe.py @@ -12,8 +12,8 @@ import astro.sql as aql from astro.airflow.datasets import DATASET_SUPPORT from astro.files import File -from astro.table import Table from astro.sql.operators.dataframe import DataframeOperator +from astro.table import Table from ..operators import utils as test_utils @@ -238,14 +238,9 @@ def count_df(df: pandas.DataFrame): mock_serde.deserialize.assert_not_called() -@pytest.mark.parametrize( - "exception", [OSError("os error"), TypeError("type error")] -) +@pytest.mark.parametrize("exception", [OSError("os error"), TypeError("type error")]) @mock.patch("astro.sql.operators.base_decorator.inspect.getsource", autospec=True) def test_get_source_code_handle_exception(mock_getsource, exception): """assert get_source_code not raise exception""" mock_getsource.side_effect = exception - DataframeOperator( - task_id="test", - python_callable=lambda: 1 - ).get_source_code(py_callable=None) + DataframeOperator(task_id="test", python_callable=lambda: 1).get_source_code(py_callable=None) From 7c3161ee739a39841d4db0cd3f7dfdc24e7c3171 Mon Sep 17 00:00:00 2001 From: rajaths010494 Date: Mon, 9 Jan 2023 15:41:19 +0530 Subject: [PATCH 6/7] Fix review comments --- python-sdk/docs/guides/openlineage.rst | 3 +++ python-sdk/src/astro/settings.py | 3 +++ python-sdk/src/astro/sql/operators/base_decorator.py | 7 +++---- python-sdk/src/astro/sql/operators/dataframe.py | 7 +++---- python-sdk/tests_integration/extractors/test_extractor.py | 4 ++++ 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/python-sdk/docs/guides/openlineage.rst b/python-sdk/docs/guides/openlineage.rst index 192ce7ede..346085c57 100644 --- a/python-sdk/docs/guides/openlineage.rst +++ b/python-sdk/docs/guides/openlineage.rst @@ -19,6 +19,9 @@ We'll need to specify where we want Astro Python SDK operators to send OpenLinea ``OPENLINEAGE_URL`` environment variable to send OpenLineage events to Marquez. Optionally, we can also specify a namespace where the lineage events will be stored using the ``OPENLINEAGE_NAMESPACE`` environment variable. +A user may choose to send or not, the source code to the OpenLineage then user can specify an environment variable +``OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE`` set with either ``True`` or ``False``. By default it will be set to ``True``. + For example, to send OpenLineage events to a local instance of Marquez with the dev namespace, use: .. code-block:: ini diff --git a/python-sdk/src/astro/settings.py b/python-sdk/src/astro/settings.py index eefddaafb..51a1285b4 100644 --- a/python-sdk/src/astro/settings.py +++ b/python-sdk/src/astro/settings.py @@ -35,6 +35,9 @@ OPENLINEAGE_EMIT_TEMP_TABLE_EVENT = conf.getboolean( SECTION_KEY, "openlineage_emit_temp_table_event", fallback=True ) +OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE = conf.getboolean( + SECTION_KEY, "openlineage_airflow_disable_source_code", fallback=True +) XCOM_BACKEND = conf.get("core", "xcom_backend") IS_BASE_XCOM_BACKEND = conf.get("core", "xcom_backend") == "airflow.models.xcom.BaseXCom" ENABLE_XCOM_PICKLING = conf.getboolean("core", "enable_xcom_pickling") diff --git a/python-sdk/src/astro/sql/operators/base_decorator.py b/python-sdk/src/astro/sql/operators/base_decorator.py index 909294981..640e76cb7 100644 --- a/python-sdk/src/astro/sql/operators/base_decorator.py +++ b/python-sdk/src/astro/sql/operators/base_decorator.py @@ -1,7 +1,6 @@ from __future__ import annotations import inspect -import os from typing import Any, Callable, Sequence, cast import pandas as pd @@ -210,6 +209,7 @@ def get_openlineage_facets_on_complete(self, task_instance): SourceCodeJobFacet, SqlJobFacet, ) + from astro.settings import OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE input_dataset: list[OpenlineageDataset] = [] output_dataset: list[OpenlineageDataset] = [] @@ -266,9 +266,8 @@ def get_openlineage_facets_on_complete(self, task_instance): base_sql_query = task_instance.xcom_pull(task_ids=task_instance.task_id, key="base_sql_query") job_facets: dict[str, BaseFacet] = {"sql": SqlJobFacet(query=base_sql_query)} - collect_source = os.environ.get("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "True") source_code = self.get_source_code(task_instance.task.python_callable) - if collect_source and source_code: + if OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE and source_code: job_facets.update( { "sourceCode": SourceCodeJobFacet("python", source_code), @@ -288,7 +287,7 @@ def get_source_code(self, py_callable: Callable) -> str | None: return str(py_callable) except OSError: self.log.warning("Can't get source code facet of Operator {self.operator.task_id}") - return None + return None def load_op_arg_dataframes_into_sql(conn_id: str, op_args: tuple, target_table: BaseTable) -> tuple: diff --git a/python-sdk/src/astro/sql/operators/dataframe.py b/python-sdk/src/astro/sql/operators/dataframe.py index b39ef6ffc..a3c5febdb 100644 --- a/python-sdk/src/astro/sql/operators/dataframe.py +++ b/python-sdk/src/astro/sql/operators/dataframe.py @@ -222,7 +222,6 @@ def get_openlineage_facets_on_complete(self, task_instance): # skipcq: PYL-W061 """ Collect the input, output, job and run facets for DataframeOperator """ - import os from astro.lineage import ( BaseFacet, @@ -234,6 +233,7 @@ def get_openlineage_facets_on_complete(self, task_instance): # skipcq: PYL-W061 SchemaField, SourceCodeJobFacet, ) + from astro.settings import OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE output_dataset: list[OpenlineageDataset] = [] @@ -265,9 +265,8 @@ def get_openlineage_facets_on_complete(self, task_instance): # skipcq: PYL-W061 run_facets: dict[str, BaseFacet] = {} job_facets: dict[str, BaseFacet] = {} - collect_source = os.environ.get("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "True") source_code = self.get_source_code(task_instance.task.python_callable) - if collect_source and source_code: + if OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE and source_code: job_facets.update( { "sourceCode": SourceCodeJobFacet("python", source_code), @@ -286,7 +285,7 @@ def get_source_code(self, py_callable: Callable) -> str | None: return str(py_callable) except OSError: self.log.warning("Can't get source code facet of Operator {self.operator.task_id}") - return None + return None def dataframe( diff --git a/python-sdk/tests_integration/extractors/test_extractor.py b/python-sdk/tests_integration/extractors/test_extractor.py index cfa5f677e..0aa1df92e 100644 --- a/python-sdk/tests_integration/extractors/test_extractor.py +++ b/python-sdk/tests_integration/extractors/test_extractor.py @@ -296,6 +296,10 @@ def top_five_animations(input_table: Table) -> str: assert task_meta_extract is None task_meta = python_sdk_extractor(task.operator).extract_on_complete(task_instance=task_instance) assert task_meta.name == f"adhoc_airflow.{task_id}" + source_code = task_meta.job_facets.get("sourceCode") + # check for transform code return is present in source code facet. + validate_string = """return "SELECT title, rating FROM {{ input_table }} LIMIT 5;""" + assert validate_string in source_code.source assert len(task_meta.job_facets) > 0 assert task_meta.run_facets == {} From 123c37fcb3f0ae2d30402e3b4f7c578f20bea53f Mon Sep 17 00:00:00 2001 From: rajaths010494 Date: Mon, 9 Jan 2023 18:40:32 +0530 Subject: [PATCH 7/7] return none --- python-sdk/src/astro/sql/operators/base_decorator.py | 2 +- python-sdk/src/astro/sql/operators/dataframe.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python-sdk/src/astro/sql/operators/base_decorator.py b/python-sdk/src/astro/sql/operators/base_decorator.py index 640e76cb7..5dd44d246 100644 --- a/python-sdk/src/astro/sql/operators/base_decorator.py +++ b/python-sdk/src/astro/sql/operators/base_decorator.py @@ -287,7 +287,7 @@ def get_source_code(self, py_callable: Callable) -> str | None: return str(py_callable) except OSError: self.log.warning("Can't get source code facet of Operator {self.operator.task_id}") - return None + return None def load_op_arg_dataframes_into_sql(conn_id: str, op_args: tuple, target_table: BaseTable) -> tuple: diff --git a/python-sdk/src/astro/sql/operators/dataframe.py b/python-sdk/src/astro/sql/operators/dataframe.py index a3c5febdb..f4d4e2a4d 100644 --- a/python-sdk/src/astro/sql/operators/dataframe.py +++ b/python-sdk/src/astro/sql/operators/dataframe.py @@ -285,7 +285,7 @@ def get_source_code(self, py_callable: Callable) -> str | None: return str(py_callable) except OSError: self.log.warning("Can't get source code facet of Operator {self.operator.task_id}") - return None + return None def dataframe(