From c98642b53c8c2d627850e0415c01ca1b3b3a9e53 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 11:15:58 +0000 Subject: [PATCH] Emit telemetry to Scarf during DAG run (#1397) Export telemetry related to Cosmos usage to [Scarf](https://about.scarf.sh/). This data assists the project maintainers in better understanding how Cosmos is used. Insights from this telemetry are critical for prioritizing patches, minor releases, and security fixes. Additionally, this information supports critical decisions related to the development road map. Deployments and individual users can opt out of analytics by setting the configuration: ``` [cosmos] enable_telemetry: False ``` As described in the [official documentation](https://docs.scarf.sh/gateway/#do-not-track), it is also possible to opt-out by setting one of the following environment variables: ```commandline AIRFLOW__COSMOS__ENABLE_TELEMETRY=False DO_NOT_TRACK=True SCARF_NO_ANALYTICS=True ``` In addition to Scarf's default data collection, Cosmos collects the following information when running Cosmos-powered DAGs: - Cosmos version - Airflow version - Python version - Operating system & machine architecture - Event type - DAG hash - Total tasks - Total Cosmos tasks No user-identifiable information (IP included) is stored in Scarf, even though Scarf infers information from the IP, such as location, and stores that. The data collection is GDPR compliant. The Apache Foundation supports this same strategy in many of its OpenSource projects, including Airflow ([#39510](https://github.com/apache/airflow/pull/39510)). Example of visualisation of the data via the Scarf UI: Screenshot 2024-12-19 at 10 22 59 Screenshot 2024-12-19 at 10 23 13 Screenshot 2024-12-19 at 10 23 21 Screenshot 2024-12-19 at 10 23 28 Screenshot 2024-12-19 at 10 23 51 Screenshot 2024-12-19 at 10 24 01 Screenshot 2024-12-19 at 10 24 11 Screenshot 2024-12-19 at 10 24 20 Screenshot 2024-12-19 at 10 24 31 Screenshot 2024-12-19 at 10 24 39 Screenshot 2024-12-19 at 10 24 48 Closes: #1143 --- PRIVACY_NOTICE.rst | 41 ++++++++ README.rst | 4 +- cosmos/constants.py | 4 + cosmos/listeners/__init__.py | 0 cosmos/listeners/dag_run_listener.py | 84 +++++++++++++++ cosmos/plugin/__init__.py | 2 + cosmos/settings.py | 22 +++- cosmos/telemetry.py | 77 ++++++++++++++ docs/index.rst | 6 +- tests/listeners/test_dag_run_listener.py | 127 +++++++++++++++++++++++ tests/test_telemetry.py | 115 ++++++++++++++++++++ 11 files changed, 477 insertions(+), 5 deletions(-) create mode 100644 PRIVACY_NOTICE.rst create mode 100644 cosmos/listeners/__init__.py create mode 100644 cosmos/listeners/dag_run_listener.py create mode 100644 cosmos/telemetry.py create mode 100644 tests/listeners/test_dag_run_listener.py create mode 100644 tests/test_telemetry.py diff --git a/PRIVACY_NOTICE.rst b/PRIVACY_NOTICE.rst new file mode 100644 index 000000000..7477ee795 --- /dev/null +++ b/PRIVACY_NOTICE.rst @@ -0,0 +1,41 @@ +Privacy Notice +============== + +This project follows the `Privacy Policy of Astronomer `_. + +Collection of Data +------------------ + +Astronomer Cosmos integrates `Scarf `_ to collect basic telemetry data during operation. +This data assists the project maintainers in better understanding how Cosmos is used. +Insights gained from this telemetry are critical for prioritizing patches, minor releases, and +security fixes. Additionally, this information supports key decisions related to the development road map. + +Deployments and individual users can opt-out of analytics by setting the configuration: + + +.. code-block:: + + [cosmos] enable_telemetry False + + +As described in the `official documentation `_, it is also possible to opt out by setting one of the following environment variables: + +.. code-block:: + + DO_NOT_TRACK=True + SCARF_NO_ANALYTICS=True + + +In addition to Scarf's default data collection, Cosmos collect the following information when running Cosmos-powered DAGs: + +- Cosmos version +- Airflow version +- Python version +- Operating system & machine architecture +- Event type +- The DAG hash +- Total tasks +- Total Cosmos tasks + +No user-identifiable information (IP included) is stored in Scarf. diff --git a/README.rst b/README.rst index 7eb32bcac..e35b8a913 100644 --- a/README.rst +++ b/README.rst @@ -82,7 +82,9 @@ _______ Privacy Notice ______________ -This project follows `Astronomer's Privacy Policy `_ +The application and this website collect telemetry to support the project's development. These can be disabled by the end-users. + +Read the `Privacy Notice `_ to learn more about it. .. Tracking pixel for Scarf diff --git a/cosmos/constants.py b/cosmos/constants.py index 8378e8d10..0513d50d2 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -160,3 +160,7 @@ def _missing_value_(cls, value): # type: ignore TESTABLE_DBT_RESOURCES = {DbtResourceType.MODEL, DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED} DBT_COMPILE_TASK_ID = "dbt_compile" + +TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{task_count}/{cosmos_task_count}" +TELEMETRY_VERSION = "v1" +TELEMETRY_TIMEOUT = 1.0 diff --git a/cosmos/listeners/__init__.py b/cosmos/listeners/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py new file mode 100644 index 000000000..0314c3474 --- /dev/null +++ b/cosmos/listeners/dag_run_listener.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +from airflow.listeners import hookimpl +from airflow.models.dag import DAG +from airflow.models.dagrun import DagRun + +from cosmos import telemetry +from cosmos.log import get_logger + +logger = get_logger(__name__) + + +class EventStatus: + SUCCESS = "success" + FAILED = "failed" + + +DAG_RUN = "dag_run" + + +def total_cosmos_tasks(dag: DAG) -> int: + """ + Identify if there are any Cosmos DAGs on a given serialized `airflow.serialization.serialized_objects.SerializedDAG`. + + The approach is naive, from the perspective it does not take into account subclasses, but it is inexpensive and + works. + """ + cosmos_tasks = 0 + for task in dag.task_dict.values(): + # In a real Airflow deployment, the following `task` is an instance of + # `airflow.serialization.serialized_objects.SerializedBaseOperator` + # and the only reference to Cosmos is in the _task_module. + # It is suboptimal, but works as of Airflow 2.10 + task_module = getattr(task, "_task_module", None) or task.__class__.__module__ + if task_module.startswith("cosmos."): + cosmos_tasks += 1 + return cosmos_tasks + + +# @provide_session +@hookimpl +def on_dag_run_success(dag_run: DagRun, msg: str) -> None: + logger.debug("Running on_dag_run_success") + # In a real Airflow deployment, the following `serialized_dag` is an instance of + # `airflow.serialization.serialized_objects.SerializedDAG` + # and it is not a subclass of DbtDag, nor contain any references to Cosmos + serialized_dag = dag_run.get_dag() + + if not total_cosmos_tasks(serialized_dag): + logger.debug("The DAG does not use Cosmos") + return + + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.SUCCESS, + "task_count": len(serialized_dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(serialized_dag), + } + + telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) + logger.debug("Completed on_dag_run_success") + + +@hookimpl +def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: + logger.debug("Running on_dag_run_failed") + # In a real Airflow deployment, the following `serialized_dag` is an instance of + # `airflow.serialization.serialized_objects.SerializedDAG` + # and it is not a subclass of DbtDag, nor contain any references to Cosmos + serialized_dag = dag_run.get_dag() + + if not total_cosmos_tasks(serialized_dag): + logger.debug("The DAG does not use Cosmos") + return + + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.FAILED, + "task_count": len(serialized_dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(serialized_dag), + } + + telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) + logger.debug("Completed on_dag_run_failed") diff --git a/cosmos/plugin/__init__.py b/cosmos/plugin/__init__.py index 5997a5fe3..4bbea4fa2 100644 --- a/cosmos/plugin/__init__.py +++ b/cosmos/plugin/__init__.py @@ -10,6 +10,7 @@ from flask import abort, url_for from flask_appbuilder import AppBuilder, expose +from cosmos.listeners import dag_run_listener from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir, dbt_docs_index_file_name, in_astro_cloud if in_astro_cloud: @@ -269,3 +270,4 @@ class CosmosPlugin(AirflowPlugin): "href": conf.get("webserver", "base_url") + "/cosmos/dbt_docs", } appbuilder_views = [item] + listeners = [dag_run_listener] diff --git a/cosmos/settings.py b/cosmos/settings.py index 5b24321c8..ba9da106a 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -37,12 +37,28 @@ remote_target_path = conf.get("cosmos", "remote_target_path", fallback=None) remote_target_path_conn_id = conf.get("cosmos", "remote_target_path_conn_id", fallback=None) +AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0") + +# The following environment variable is populated in Astro Cloud +in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud" + try: LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") except airflow.exceptions.AirflowConfigException: LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) -AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0") -# The following environment variable is populated in Astro Cloud -in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud" +def convert_to_boolean(value: str | None) -> bool: + """ + Convert a string that represents a boolean to a Python boolean. + """ + value = str(value).lower().strip() + if value in ("f", "false", "0", "", "none"): + return False + return True + + +# Telemetry-related settings +enable_telemetry = conf.getboolean("cosmos", "enable_telemetry", fallback=True) +do_not_track = convert_to_boolean(os.getenv("DO_NOT_TRACK")) +no_analytics = convert_to_boolean(os.getenv("SCARF_NO_ANALYTICS")) diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py new file mode 100644 index 000000000..0e267b28b --- /dev/null +++ b/cosmos/telemetry.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import platform +from urllib import parse +from urllib.parse import urlencode + +import httpx +from airflow import __version__ as airflow_version + +import cosmos +from cosmos import constants, settings +from cosmos.log import get_logger + +logger = get_logger(__name__) + + +def should_emit() -> bool: + """ + Identify if telemetry metrics should be emitted or not. + """ + return settings.enable_telemetry and not settings.do_not_track and not settings.no_analytics + + +def collect_standard_usage_metrics() -> dict[str, object]: + """ + Return standard telemetry metrics. + """ + metrics = { + "cosmos_version": cosmos.__version__, # type: ignore[attr-defined] + "airflow_version": parse.quote(airflow_version), + "python_version": platform.python_version(), + "platform_system": platform.system(), + "platform_machine": platform.machine(), + "variables": {}, + } + return metrics + + +def emit_usage_metrics(metrics: dict[str, object]) -> bool: + """ + Emit desired telemetry metrics to remote telemetry endpoint. + + The metrics must contain the necessary fields to build the TELEMETRY_URL. + """ + query_string = urlencode(metrics) + telemetry_url = constants.TELEMETRY_URL.format( + **metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string + ) + logger.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) + response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True) + if not response.is_success: + logger.warning( + "Unable to emit usage metrics to %s. Status code: %s. Message: %s", + telemetry_url, + response.status_code, + response.text, + ) + return response.is_success + + +def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, object]) -> bool: + """ + Checks if telemetry should be emitted, fetch standard metrics, complement with custom metrics + and emit them to remote telemetry endpoint. + + :returns: If the event was successfully sent to the telemetry backend or not. + """ + if should_emit(): + metrics = collect_standard_usage_metrics() + metrics["event_type"] = event_type + metrics["variables"].update(additional_metrics) # type: ignore[attr-defined] + metrics.update(additional_metrics) + is_success = emit_usage_metrics(metrics) + return is_success + else: + logger.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") + return False diff --git a/docs/index.rst b/docs/index.rst index e788bd04c..7a56b8df7 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -137,10 +137,14 @@ _______ `Apache License 2.0 `_ + Privacy Notice ______________ -This project follows `Astronomer's Privacy Policy `_ +The application and this website collect telemetry to support the project's development. These can be disabled by the end-users. + +Read the `Privacy Notice `_ to learn more about it. + .. Tracking pixel for Scarf .. raw:: html diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py new file mode 100644 index 000000000..a547f20ad --- /dev/null +++ b/tests/listeners/test_dag_run_listener.py @@ -0,0 +1,127 @@ +import logging +import uuid +from datetime import datetime +from pathlib import Path +from unittest.mock import patch + +import pytest +from airflow.models import DAG +from airflow.utils.state import State + +from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig +from cosmos.airflow.dag import DbtDag +from cosmos.airflow.task_group import DbtTaskGroup +from cosmos.listeners.dag_run_listener import on_dag_run_failed, on_dag_run_success, total_cosmos_tasks +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DBT_ROOT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt" +DBT_PROJECT_NAME = "jaffle_shop" + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + + +@pytest.mark.integration +def test_is_cosmos_dag_is_true(): + dag = DbtDag( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="basic_cosmos_dag", + ) + assert total_cosmos_tasks(dag) == 13 + + +@pytest.mark.integration +def test_total_cosmos_tasks_in_task_group(): + with DAG("test-id-dbt-compile", start_date=datetime(2022, 1, 1)) as dag: + _ = DbtTaskGroup( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + ) + + assert total_cosmos_tasks(dag) == 13 + + +def test_total_cosmos_tasks_is_one(): + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + run_operator = DbtRunLocalOperator( + profile_config=profile_config, + project_dir=DBT_ROOT_PATH / "jaffle_shop", + task_id="run", + install_deps=True, + append_env=True, + ) + run_operator + + assert total_cosmos_tasks(dag) == 1 + + +def test_not_cosmos_dag(): + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + pass + + assert total_cosmos_tasks(dag) == 0 + + +@pytest.mark.integration +@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog): + caplog.set_level(logging.DEBUG) + + dag = DbtDag( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="basic_cosmos_dag", + ) + run_id = str(uuid.uuid1()) + dag_run = dag.create_dagrun( + state=State.NONE, + run_id=run_id, + ) + + on_dag_run_success(dag_run, msg="test success") + assert "Running on_dag_run_success" in caplog.text + assert "Completed on_dag_run_success" in caplog.text + assert mock_emit_usage_metrics_if_enabled.call_count == 1 + + +@pytest.mark.integration +@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog): + caplog.set_level(logging.DEBUG) + + dag = DbtDag( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="basic_cosmos_dag", + ) + run_id = str(uuid.uuid1()) + dag_run = dag.create_dagrun( + state=State.FAILED, + run_id=run_id, + ) + + on_dag_run_failed(dag_run, msg="test failed") + assert "Running on_dag_run_failed" in caplog.text + assert "Completed on_dag_run_failed" in caplog.text + assert mock_emit_usage_metrics_if_enabled.call_count == 1 diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py new file mode 100644 index 000000000..b11caabe1 --- /dev/null +++ b/tests/test_telemetry.py @@ -0,0 +1,115 @@ +import logging +from unittest.mock import patch + +import pytest + +from cosmos import telemetry + + +def test_should_emit_is_true_by_default(): + assert telemetry.should_emit() + + +@patch("cosmos.settings.enable_telemetry", True) +def test_should_emit_is_true_when_only_enable_telemetry_is_true(): + assert telemetry.should_emit() + + +@patch("cosmos.settings.do_not_track", True) +def test_should_emit_is_false_when_do_not_track(): + assert not telemetry.should_emit() + + +@patch("cosmos.settings.no_analytics", True) +def test_should_emit_is_false_when_no_analytics(): + assert not telemetry.should_emit() + + +def test_collect_standard_usage_metrics(): + metrics = telemetry.collect_standard_usage_metrics() + expected_keys = [ + "airflow_version", + "cosmos_version", + "platform_machine", + "platform_system", + "python_version", + "variables", + ] + assert sorted(metrics.keys()) == expected_keys + + +class MockFailedResponse: + is_success = False + status_code = "404" + text = "Non existent URL" + + +@patch("cosmos.telemetry.httpx.get", return_value=MockFailedResponse()) +def test_emit_usage_metrics_fails(mock_httpx_get, caplog): + sample_metrics = { + "cosmos_version": "1.8.0a4", + "airflow_version": "2.10.1", + "python_version": "3.11", + "platform_system": "darwin", + "platform_machine": "amd64", + "event_type": "dag_run", + "status": "success", + "dag_hash": "d151d1fa2f03270ea116cc7494f2c591", + "task_count": 3, + "cosmos_task_count": 3, + } + is_success = telemetry.emit_usage_metrics(sample_metrics) + mock_httpx_get.assert_called_once_with( + f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3/3""", + timeout=1.0, + follow_redirects=True, + ) + assert not is_success + log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3/3. Status code: 404. Message: Non existent URL""" + assert caplog.text.startswith("WARNING") + assert log_msg in caplog.text + + +@pytest.mark.integration +def test_emit_usage_metrics_succeeds(caplog): + caplog.set_level(logging.DEBUG) + sample_metrics = { + "cosmos_version": "1.8.0a4", + "airflow_version": "2.10.1", + "python_version": "3.11", + "platform_system": "darwin", + "platform_machine": "amd64", + "event_type": "dag_run", + "status": "success", + "dag_hash": "dag-hash-ci", + "task_count": 33, + "cosmos_task_count": 33, + } + is_success = telemetry.emit_usage_metrics(sample_metrics) + assert is_success + assert caplog.text.startswith("DEBUG") + assert "Telemetry is enabled. Emitting the following usage metrics to" in caplog.text + + +@patch("cosmos.telemetry.should_emit", return_value=False) +def test_emit_usage_metrics_if_enabled_fails(mock_should_emit, caplog): + caplog.set_level(logging.DEBUG) + assert not telemetry.emit_usage_metrics_if_enabled("any", {}) + assert caplog.text.startswith("DEBUG") + assert "Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True." in caplog.text + + +@patch("cosmos.telemetry.should_emit", return_value=True) +@patch("cosmos.telemetry.collect_standard_usage_metrics", return_value={"k1": "v1", "k2": "v2", "variables": {}}) +@patch("cosmos.telemetry.emit_usage_metrics") +def test_emit_usage_metrics_if_enabled_succeeds( + mock_emit_usage_metrics, mock_collect_standard_usage_metrics, mock_should_emit +): + assert telemetry.emit_usage_metrics_if_enabled("any", {"k2": "v2"}) + mock_emit_usage_metrics.assert_called_once() + assert mock_emit_usage_metrics.call_args.args[0] == { + "k1": "v1", + "k2": "v2", + "event_type": "any", + "variables": {"k2": "v2"}, + }