Skip to content

Commit

Permalink
Fix httpx.get exception handling while emitting telemetry (#1439)
Browse files Browse the repository at this point in the history
Closes: #1438

A user reported httpx raised an exception while trying to emit
telemetry, leading the DAG to not being successful. This PR aims to
solve the issue:

```
File "/usr/local/lib/python3.12/site-packages/cosmos/listeners/dag_run_listener.py", line 60, in on_dag_run_success
     telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)
   File "/usr/local/lib/python3.12/site-packages/cosmos/telemetry.py", line 73, in emit_usage_metrics_if_enabled
     is_success = emit_usage_metrics(metrics)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File "/usr/local/lib/python3.12/site-packages/cosmos/telemetry.py", line 50, in emit_usage_metrics
     response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File "/usr/local/lib/python3.12/site-packages/httpx/_api.py", line 198, in get
     return request(
            ^^^^^^^^
   File "/usr/local/lib/python3.12/site-packages/httpx/_api.py", line 106, in request
     return client.request(
            ^^^^^^^^^^^^^^^
   File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 827, in request
     return self.send(request, auth=auth, follow_redirects=follow_redirects)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 914, in send
     response = self._send_handling_auth(
                ^^^^^^^^^^^^^^^^^^^^^^^^^
   File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 942, in _send_handling_auth
     response = self._send_handling_redirects(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 979, in _send_handling_redirects
     response = self._send_single_request(request)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1015, in _send_single_request
     response = transport.handle_request(request)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 232, in handle_request
     with map_httpcore_exceptions():
          ^^^^^^^^^^^^^^^^^^^^^^^^^
   File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
     self.gen.throw(value)
   File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
     raise mapped_exc(message) from exc
 httpx.ConnectError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self-signed certificate in certificate chain (_ssl.c:1000)
```
  • Loading branch information
tatiana authored Jan 3, 2025
1 parent a612b23 commit e002858
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 8 deletions.
22 changes: 15 additions & 7 deletions cosmos/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,23 @@ def emit_usage_metrics(metrics: dict[str, object]) -> bool:
**metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string
)
logger.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics)
response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True)
if not response.is_success:
try:
response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True)
except httpx.HTTPError as e:
logger.warning(
"Unable to emit usage metrics to %s. Status code: %s. Message: %s",
telemetry_url,
response.status_code,
response.text,
"Unable to emit usage metrics to %s. An HTTPX connection error occurred: %s.", telemetry_url, str(e)
)
return response.is_success
is_success = False
else:
is_success = response.is_success
if not is_success:
logger.warning(
"Unable to emit usage metrics to %s. Status code: %s. Message: %s",
telemetry_url,
response.status_code,
response.text,
)
return is_success


def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, object]) -> bool:
Expand Down
29 changes: 28 additions & 1 deletion tests/test_telemetry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from unittest.mock import patch

import httpx
import pytest

from cosmos import telemetry
Expand Down Expand Up @@ -45,7 +46,7 @@ class MockFailedResponse:


@patch("cosmos.telemetry.httpx.get", return_value=MockFailedResponse())
def test_emit_usage_metrics_fails(mock_httpx_get, caplog):
def test_emit_usage_metrics_is_unsuccessful(mock_httpx_get, caplog):
sample_metrics = {
"cosmos_version": "1.8.0a4",
"airflow_version": "2.10.1",
Expand All @@ -70,6 +71,32 @@ def test_emit_usage_metrics_fails(mock_httpx_get, caplog):
assert log_msg in caplog.text


@patch("cosmos.telemetry.httpx.get", side_effect=httpx.ConnectError(message="Something is not right"))
def test_emit_usage_metrics_fails(mock_httpx_get, caplog):
sample_metrics = {
"cosmos_version": "1.8.0a4",
"airflow_version": "2.10.1",
"python_version": "3.11",
"platform_system": "darwin",
"platform_machine": "amd64",
"event_type": "dag_run",
"status": "success",
"dag_hash": "d151d1fa2f03270ea116cc7494f2c591",
"task_count": 3,
"cosmos_task_count": 3,
}
is_success = telemetry.emit_usage_metrics(sample_metrics)
mock_httpx_get.assert_called_once_with(
f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3/3""",
timeout=1.0,
follow_redirects=True,
)
assert not is_success
log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3/3. An HTTPX connection error occurred: Something is not right."""
assert caplog.text.startswith("WARNING")
assert log_msg in caplog.text


@pytest.mark.integration
def test_emit_usage_metrics_succeeds(caplog):
caplog.set_level(logging.DEBUG)
Expand Down

0 comments on commit e002858

Please sign in to comment.