Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add source code facet for operators #1537

Merged
merged 7 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python-sdk/docs/guides/openlineage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ We'll need to specify where we want Astro Python SDK operators to send OpenLinea
``OPENLINEAGE_URL`` environment variable to send OpenLineage events to Marquez. Optionally, we can also
specify a namespace where the lineage events will be stored using the ``OPENLINEAGE_NAMESPACE`` environment variable.

A user may choose to send or not, the source code to the OpenLineage then user can specify an environment variable
``OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE`` set with either ``True`` or ``False``. By default it will be set to ``True``.

For example, to send OpenLineage events to a local instance of Marquez with the dev namespace, use:

.. code-block:: ini
Expand Down
1 change: 1 addition & 0 deletions python-sdk/src/astro/lineage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
OutputStatisticsOutputDatasetFacet,
SchemaDatasetFacet,
SchemaField,
SourceCodeJobFacet,
SqlJobFacet,
)
from openlineage.client.run import Dataset as OpenlineageDataset
Expand Down
3 changes: 3 additions & 0 deletions python-sdk/src/astro/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
OPENLINEAGE_EMIT_TEMP_TABLE_EVENT = conf.getboolean(
SECTION_KEY, "openlineage_emit_temp_table_event", fallback=True
)
OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE = conf.getboolean(
SECTION_KEY, "openlineage_airflow_disable_source_code", fallback=True
)
XCOM_BACKEND = conf.get("core", "xcom_backend")
IS_BASE_XCOM_BACKEND = conf.get("core", "xcom_backend") == "airflow.models.xcom.BaseXCom"
ENABLE_XCOM_PICKLING = conf.getboolean("core", "enable_xcom_pickling")
Expand Down
22 changes: 21 additions & 1 deletion python-sdk/src/astro/sql/operators/base_decorator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import inspect
from typing import Any, Sequence, cast
from typing import Any, Callable, Sequence, cast

import pandas as pd
from airflow.decorators.base import DecoratedOperator
Expand Down Expand Up @@ -206,8 +206,10 @@ def get_openlineage_facets_on_complete(self, task_instance):
OutputStatisticsOutputDatasetFacet,
SchemaDatasetFacet,
SchemaField,
SourceCodeJobFacet,
SqlJobFacet,
)
from astro.settings import OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE

input_dataset: list[OpenlineageDataset] = []
output_dataset: list[OpenlineageDataset] = []
Expand Down Expand Up @@ -264,11 +266,29 @@ def get_openlineage_facets_on_complete(self, task_instance):

base_sql_query = task_instance.xcom_pull(task_ids=task_instance.task_id, key="base_sql_query")
job_facets: dict[str, BaseFacet] = {"sql": SqlJobFacet(query=base_sql_query)}
source_code = self.get_source_code(task_instance.task.python_callable)
if OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE and source_code:
job_facets.update(
{
"sourceCode": SourceCodeJobFacet("python", source_code),
}
)

return OperatorLineage(
inputs=input_dataset, outputs=output_dataset, run_facets=run_facets, job_facets=job_facets
)

def get_source_code(self, py_callable: Callable) -> str | None:
"""Return the source code for the lineage"""
try:
return inspect.getsource(py_callable)
except TypeError:
# Trying to extract source code of builtin_function_or_method
return str(py_callable)
except OSError:
self.log.warning("Can't get source code facet of Operator {self.operator.task_id}")
return None
rajaths010494 marked this conversation as resolved.
Show resolved Hide resolved


def load_op_arg_dataframes_into_sql(conn_id: str, op_args: tuple, target_table: BaseTable) -> tuple:
"""
Expand Down
21 changes: 21 additions & 0 deletions python-sdk/src/astro/sql/operators/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def get_openlineage_facets_on_complete(self, task_instance): # skipcq: PYL-W061
"""
Collect the input, output, job and run facets for DataframeOperator
"""

from astro.lineage import (
BaseFacet,
DataSourceDatasetFacet,
Expand All @@ -230,7 +231,9 @@ def get_openlineage_facets_on_complete(self, task_instance): # skipcq: PYL-W061
OutputStatisticsOutputDatasetFacet,
SchemaDatasetFacet,
SchemaField,
SourceCodeJobFacet,
)
from astro.settings import OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE

output_dataset: list[OpenlineageDataset] = []

Expand Down Expand Up @@ -262,10 +265,28 @@ def get_openlineage_facets_on_complete(self, task_instance): # skipcq: PYL-W061

run_facets: dict[str, BaseFacet] = {}
job_facets: dict[str, BaseFacet] = {}
source_code = self.get_source_code(task_instance.task.python_callable)
if OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE and source_code:
job_facets.update(
{
"sourceCode": SourceCodeJobFacet("python", source_code),
}
)
return OperatorLineage(
inputs=[], outputs=output_dataset, run_facets=run_facets, job_facets=job_facets
)

def get_source_code(self, py_callable: Callable) -> str | None:
"""Return the source code for the lineage"""
try:
return inspect.getsource(py_callable)
except TypeError:
# Trying to extract source code of builtin_function_or_method
return str(py_callable)
except OSError:
self.log.warning("Can't get source code facet of Operator {self.operator.task_id}")
return None
rajaths010494 marked this conversation as resolved.
Show resolved Hide resolved


def dataframe(
python_callable: Callable | None = None,
Expand Down
15 changes: 15 additions & 0 deletions python-sdk/tests/sql/operators/test_base_decorator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
from unittest import mock

import pytest

from astro.sql import RawSQLOperator
from astro.sql.operators.base_decorator import BaseSQLDecoratedOperator


Expand All @@ -7,3 +12,13 @@ def test_base_sql_decorated_operator_template_fields_with_parameters():
as this required for taskflow to work if XCom args are being passed via parameters.
"""
assert "parameters" in BaseSQLDecoratedOperator.template_fields


@pytest.mark.parametrize("exception", [OSError("os error"), TypeError("type error")])
rajaths010494 marked this conversation as resolved.
Show resolved Hide resolved
@mock.patch("astro.sql.operators.base_decorator.inspect.getsource", autospec=True)
def test_get_source_code_handle_exception(mock_getsource, exception):
"""assert get_source_code not raise exception"""
mock_getsource.side_effect = exception
RawSQLOperator(task_id="test", sql="select * from 1", python_callable=lambda: 1).get_source_code(
py_callable=None
)
9 changes: 9 additions & 0 deletions python-sdk/tests/sql/operators/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import astro.sql as aql
from astro.airflow.datasets import DATASET_SUPPORT
from astro.files import File
from astro.sql.operators.dataframe import DataframeOperator
from astro.table import Table

from ..operators import utils as test_utils
Expand Down Expand Up @@ -235,3 +236,11 @@ def count_df(df: pandas.DataFrame):
test_utils.run_dag(sample_dag)
mock_serde.serialize.assert_not_called()
mock_serde.deserialize.assert_not_called()


@pytest.mark.parametrize("exception", [OSError("os error"), TypeError("type error")])
@mock.patch("astro.sql.operators.base_decorator.inspect.getsource", autospec=True)
def test_get_source_code_handle_exception(mock_getsource, exception):
"""assert get_source_code not raise exception"""
mock_getsource.side_effect = exception
DataframeOperator(task_id="test", python_callable=lambda: 1).get_source_code(py_callable=None)
4 changes: 4 additions & 0 deletions python-sdk/tests_integration/extractors/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ def top_five_animations(input_table: Table) -> str:
assert task_meta_extract is None
task_meta = python_sdk_extractor(task.operator).extract_on_complete(task_instance=task_instance)
assert task_meta.name == f"adhoc_airflow.{task_id}"
source_code = task_meta.job_facets.get("sourceCode")
# check for transform code return is present in source code facet.
validate_string = """return "SELECT title, rating FROM {{ input_table }} LIMIT 5;"""
assert validate_string in source_code.source
assert len(task_meta.job_facets) > 0
assert task_meta.run_facets == {}

Expand Down