Skip to content

Commit

Permalink
Conti/emit tested integration versions (#140)
Browse files Browse the repository at this point in the history
Add support for tracking tested integrations
  • Loading branch information
wconti27 authored Oct 11, 2023
1 parent f127b47 commit 34e41d9
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 7 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,29 @@ summary = {

```
curl -X GET 'http://0.0.0.0:8126/test/trace_check/summary'
```

### /test/session/integrations (PUT)
Update information about the current tested integration.

#### [optional] `?test_session_token=`
#### [optional] `X-Datadog-Test-Session-Token`

```
curl -X PUT 'http://0.0.0.0:8126/test/session/integrations' -d '{"integration_name": [INTEGRATION_NAME], "integration_version": [INTEGRATION_VERSION],
"dependency_name": [DEPENDENCY_NAME], "tracer_language": [TRACER_LANGUAGE], "tracer_version": [TRACER_VERSION]}'
```

### /test/integrations/tested_versions (GET)
Return a csv list of all tested integrations received by the agent. The format of returned data will be:
`tracer_language,tracer_version,integration_name,integration_version,dependency_name`.

#### [optional] `?test_session_token=`
#### [optional] `X-Datadog-Test-Session-Token`

```
curl -X GET 'http://0.0.0.0:8126/test/integrations/tested_versions'
```

### /v0.1/pipeline_stats

Expand Down
109 changes: 105 additions & 4 deletions ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from .checks import CheckTrace
from .checks import Checks
from .checks import start_trace
from .integration import Integration
from .remoteconfig import RemoteConfigServer
from .trace import Span
from .trace import Trace
Expand Down Expand Up @@ -112,7 +113,6 @@ async def _forward_request(request_data, headers, full_agent_url):
response_data = await resp.read()
if len(response_data) == 0:
log.info("Received empty response: %r from agent.", response_data)
return web.HTTPOk()
else:
if isinstance(response_data, bytes):
response_data = response_data.decode()
Expand All @@ -123,15 +123,13 @@ async def _forward_request(request_data, headers, full_agent_url):
log.warning("Original Request: %r", request_data)
response_data = {}
log.info("Response %r from agent:", response_data)
return web.json_response(data=response_data)
elif "text/plain" in resp.content_type:
response_data = await resp.text()
log.info("Response %r from agent:", response_data)
return web.Response(text=response_data, content_type="text/plain")
else:
response_data = await resp.json()
log.info("Response %r from agent:", response_data)
return web.json_response(data=response_data)
return resp


async def _prepare_and_send_request(data, request, headers):
Expand Down Expand Up @@ -387,6 +385,58 @@ async def _tracestats_by_session(self, token: Optional[str]) -> List[v06StatsPay
stats.append(s)
return stats

async def _integration_requests_by_session(
self, token: Optional[str], include_sent_integrations: Optional[bool] = False
) -> List[Request]:
"""Get all requests with an associated tested Integration."""
integration_requests: List[Request] = []
for req in self._requests_by_session(token):
# see if the request was to update with a newly tested integration
if req.match_info.handler == self.handle_put_tested_integrations:
if "integration" not in req:
data = json.loads(await req.read())
integration_name = data.get("integration_name", None)
integration_version = data.get("integration_version", None)
req["integration"] = Integration(
integration_name=integration_name,
integration_version=integration_version,
dependency_name=data.get("dependency_name", integration_name),
)
req["tracer_version"] = data.get("tracer_version", None)
req["tracer_language"] = data.get("tracer_language", None)
integration_requests.append(req)
elif include_sent_integrations:
integration_requests.append(req)
# check if integration data was provided in the trace request instead
elif (
"_dd_trace_env_variables" in req
and "DD_INTEGRATION" in req["_dd_trace_env_variables"]
and "DD_INTEGRATION_VERSION" in req["_dd_trace_env_variables"]
):
integration_name = req["_dd_trace_env_variables"]["DD_INTEGRATION"]
integration_version = req["_dd_trace_env_variables"]["DD_INTEGRATION_VERSION"]

if "integration" not in req:
req["integration"] = Integration(
integration_name=integration_name,
integration_version=integration_version,
dependency_name=req["_dd_trace_env_variables"].get("DD_DEPENDENCY_NAME", integration_name),
)

if req.headers.get("dd-client-library-version", None):
req["tracer_version"] = req.headers.get("dd-client-library-version")
elif req.headers.get("datadog-meta-tracer-version", None):
req["tracer_version"] = req.headers.get("datadog-meta-tracer-version")

if req.headers.get("dd-client-library-language", None):
req["tracer_language"] = req.headers.get("dd-client-library-language")
elif req.headers.get("datadog-meta-lang", None):
req["tracer_language"] = req.headers.get("datadog-meta-lang")
integration_requests.append(req)
elif include_sent_integrations:
integration_requests.append(req)
return integration_requests

def _decode_v04_traces(self, request: Request) -> v04TracePayload:
content_type = request.content_type
raw_data = self._request_data(request)
Expand Down Expand Up @@ -459,6 +509,55 @@ async def handle_v2_apmtelemetry(self, request: Request) -> web.Response:
# TODO: Snapshots
return web.HTTPOk()

async def handle_put_tested_integrations(self, request: Request) -> web.Response:
# we need to store the request manually since this is not a real DD agent endpoint
await self._store_request(request)
return web.HTTPOk()

async def handle_get_tested_integrations(self, request: Request) -> web.Response:
"""Return all tested integrations according to integration data received by agent."""
text_headers = ["language_name", "tracer_version", "integration_name", "integration_version", "dependency_name"]
aggregated_text = ""
seen_integrations = set()
req_headers = {}

# get all requests associated with an integration
reqs = await self._integration_requests_by_session(
token=_session_token(request), include_sent_integrations=True
)
for req in reqs:
integration = req["integration"]

# only include the integration in response if all data is included and integration hasn't already been added
if (
integration.integration_name
and integration.integration_version
and integration.dependency_name
and req["tracer_language"]
and req["tracer_version"]
and f"{integration.integration_name}@{integration.integration_version}" not in seen_integrations
):
aggregated_text += (
",".join(
[
req["tracer_language"],
".".join(req["tracer_version"].split("-")[0].split(".")[0:3]), # ensure semver
integration.integration_name,
integration.integration_version,
integration.dependency_name,
]
)
+ "\n"
)
# update seen integrations to skip this specific integration and version next loop from another request
seen_integrations.add(f"{integration.integration_name}@{integration.integration_version}")
# given that we will mainly see one integration per call, set a header for the calling lib to know the
# integration name
req_headers["file-name"] = integration.integration_name
if len(aggregated_text) > 0:
aggregated_text = ",".join(text_headers) + "\n" + aggregated_text
return web.Response(body=aggregated_text, content_type="text/plain", headers=req_headers)

async def handle_info(self, request: Request) -> web.Response:
return web.json_response(
{
Expand Down Expand Up @@ -898,13 +997,15 @@ def make_app(
web.post("/test/session/responses/config", agent.handle_v07_remoteconfig_create),
web.post("/test/session/responses/config/path", agent.handle_v07_remoteconfig_path_create),
web.put("/test/session/responses/config", agent.handle_v07_remoteconfig_put),
web.put("/test/session/integrations", agent.handle_put_tested_integrations),
web.get("/test/traces", agent.handle_test_traces),
web.get("/test/apmtelemetry", agent.handle_test_apmtelemetry),
# web.get("/test/benchmark", agent.handle_test_traces),
web.get("/test/trace/analyze", agent.handle_trace_analyze),
web.get("/test/trace_check/failures", agent.get_trace_check_failures),
web.get("/test/trace_check/clear", agent.clear_trace_check_failures),
web.get("/test/trace_check/summary", agent.get_trace_check_summary),
web.get("/test/integrations/tested_versions", agent.handle_get_tested_integrations),
]
)
checks = Checks(
Expand Down
5 changes: 5 additions & 0 deletions ddapm_test_agent/integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class Integration:
def __init__(self, integration_name: str, integration_version: str, dependency_name: str):
self.integration_name = integration_name
self.integration_version = integration_version
self.dependency_name = dependency_name
4 changes: 2 additions & 2 deletions ddapm_test_agent/remoteconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def _update_response(self, token: str, data: Dict[str, Any]) -> None:
def _create_response(self, token: str, data: Dict[str, Any]) -> None:
self._responses[token] = data

async def _get_response(self, token: str) -> Dict[str, Any]:
async def _get_response(self, token: str) -> Any:
return self._responses.get(token, {})

def update_config_response(self, token: str, data: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -90,5 +90,5 @@ def create_config_path_response(self, token: str, path: str, msg: str) -> None:
def create_config_response(self, token: str, data: Dict[str, Any]) -> None:
self._create_response(token, data)

async def get_config_response(self, token: str) -> Dict[str, Any]:
async def get_config_response(self, token: str) -> Any:
return await self._get_response(token)
2 changes: 1 addition & 1 deletion ddapm_test_agent/trace_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def check(self, trace: List[Span], dd_config_env: Dict[str, str]) -> None:
if component != "":
dd_service = dd_config_env.get(f"DD_{component.upper()}_SERVICE", None)
if not dd_service:
log.error("DD_SERVICE not set for component: %s. Args %s", component, dd_config_env)
log.debug("DD_SERVICE not set for component: %s. Args %s", component, dd_config_env)
dd_service = dd_service or dd_config_env.get("DD_SERVICE", None)
if dd_service is None:
self.fail(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
features:
- |
Add support for tracking the integrations being tested. Tracked integrations can be recorded by a [PUT] request to the Test Agent
at `/test/session/integrations`. To get data about which integrations the Test Agent encountered, make a [GET] request to `/test/integrations/tested_versions`.
Tested integrations include information such as the integration name, the tested integration version, the tracer language, the tracer version and the
dependency name of the integration.
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ def v04_reference_http_trace_payload_headers() -> Dict[str, str]:
"Content-Type": "application/msgpack",
"X-Datadog-Trace-Count": "1",
"Datadog-Meta-Tracer-Version": "v0.1",
"datadog-meta-lang": "python",
"X-Datadog-Trace-Env-Variables": "DD_INTEGRATION=express,DD_INTEGRATION_VERSION=1.2.3",
}
return headers

Expand Down
49 changes: 49 additions & 0 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,52 @@ async def test_get_trace_failures_and_clear_json(
response = await agent.get("/test/trace_check/summary?return_all=true&use_json=true")
assert response.status == 200
assert await response.json() == {}


async def test_integrations_from_trace(
agent, v04_reference_http_trace_payload_headers, v04_reference_http_trace_payload_data
):
resp = await agent.put(
"/v0.4/traces",
headers=v04_reference_http_trace_payload_headers,
data=v04_reference_http_trace_payload_data,
)
assert resp.status == 200, await resp.text()

resp = await agent.get("/test/integrations/tested_versions")
assert resp.status == 200

text = await resp.text()

assert (
text
== "language_name,tracer_version,integration_name,integration_version,dependency_name\npython,v0.1,express,1.2.3,express\n"
)


async def test_put_integrations(
agent,
):
resp = await agent.put(
"/test/session/integrations",
data=json.dumps(
{
"integration_name": "flask",
"integration_version": "1.1.1",
"dependency_name": "not_flask",
"tracer_version": "v1",
"tracer_language": "python",
}
),
)
assert resp.status == 200, await resp.text()

resp = await agent.get("/test/integrations/tested_versions")
assert resp.status == 200

text = await resp.text()

assert (
text
== "language_name,tracer_version,integration_name,integration_version,dependency_name\npython,v1,flask,1.1.1,not_flask\n"
)

0 comments on commit 34e41d9

Please sign in to comment.