Skip to content

Commit

Permalink
feat(telemetry): add initial support for capturing telemetry event re…
Browse files Browse the repository at this point in the history
…quests (#91)

* feat(telemetry): add initial support for capturing telemetry event requests

* add telemetry to info endpoint
  • Loading branch information
brettlangdon authored Jul 22, 2022
1 parent ac99f86 commit 2653859
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 2 deletions.
73 changes: 73 additions & 0 deletions ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from . import _get_version
from . import trace_snapshot
from . import tracestats_snapshot
from .apmtelemetry import TelemetryEvent
from .apmtelemetry import v2_decode as v2_apmtelemetry_decode
from .checks import CheckTrace
from .checks import Checks
from .checks import start_trace
Expand Down Expand Up @@ -130,9 +132,26 @@ async def traces(self) -> TraceMap:
_traces[trace_id].append(s)
return _traces

async def apmtelemetry(self) -> List[TelemetryEvent]:
"""Return the telemetry events stored by the agent"""
_events: List[TelemetryEvent] = []
for req in reversed(self._requests):
if req.match_info.handler == self.handle_v2_apmtelemetry:
_events.append(v2_apmtelemetry_decode(await req.read()))
return _events

async def _trace_by_trace_id(self, trace_id: int) -> Trace:
return (await self.traces())[trace_id]

async def _apmtelemetry_by_runtime_id(
self, runtime_id: str
) -> List[TelemetryEvent]:
return [
event
for event in await self.apmtelemetry()
if event["runtime_id"] == runtime_id
]

def _requests_by_session(self, token: Optional[str]) -> List[Request]:
"""Return the latest requests sent with the given token.
Expand Down Expand Up @@ -176,6 +195,23 @@ async def _traces_by_session(self, token: Optional[str]) -> List[Trace]:
tracemap[trace_id].append(span)
return list(tracemap.values())

async def _apmtelemetry_by_session(
self, token: Optional[str]
) -> List[TelemetryEvent]:
"""Return the telemetry events that belong to the given session token.
If token is None or if the token was used to manually start a session
with /session-start then return all telemetry events that were sent since
the last /session-start request was made.
"""
events: List[TelemetryEvent] = []
for req in self._requests_by_session(token):
if req.match_info.handler == self.handle_v2_apmtelemetry:
events.append(v2_apmtelemetry_decode(await req.read()))

# TODO: Sort the events?
return events

async def _tracestats_by_session(
self, token: Optional[str]
) -> List[v06StatsPayload]:
Expand Down Expand Up @@ -216,6 +252,13 @@ async def handle_v06_tracestats(self, request: Request) -> web.Response:
)
return web.HTTPOk()

async def handle_v2_apmtelemetry(self, request: Request) -> web.Response:
self._requests.append(request)
v2_apmtelemetry_decode(await request.read())
# TODO: Validation
# TODO: Snapshots
return web.HTTPOk()

async def handle_info(self, request: Request) -> web.Response:
return web.json_response(
{
Expand All @@ -224,6 +267,7 @@ async def handle_info(self, request: Request) -> web.Response:
"/v0.4/traces",
"/v0.5/traces",
"/v0.6/stats",
"/telemetry/proxy/",
],
"feature_flags": [],
"config": {},
Expand Down Expand Up @@ -405,6 +449,11 @@ async def handle_session_traces(self, request: Request) -> web.Response:
traces = await self._traces_by_session(token)
return web.json_response(traces)

async def handle_session_apmtelemetry(self, request: Request) -> web.Response:
token = request["session_token"]
events = await self._apmtelemetry_by_session(token)
return web.json_response(events)

async def handle_session_tracestats(self, request: Request) -> web.Response:
token = request["session_token"]
stats = await self._tracestats_by_session(token)
Expand All @@ -418,6 +467,7 @@ async def handle_session_requests(self, request: Request) -> web.Response:
self.handle_v04_traces,
self.handle_v05_traces,
self.handle_v06_tracestats,
self.handle_v2_apmtelemetry,
):
continue
resp.append(
Expand Down Expand Up @@ -451,6 +501,24 @@ async def handle_test_traces(self, request: Request) -> web.Response:
traces = list((await self.traces()).values())
return web.json_response(data=traces)

async def handle_test_apmtelemetry(self, request: Request) -> web.Response:
"""Return requested telemetry events as JSON.
Telemetry events can be requested by providing a header X-Datadog-Runtime-Ids or
a query param runtime_ids.
"""
raw_runtime_ids = request.url.query.get(
"runtime_ids", request.headers.get("X-Datadog-Runtime-Ids", "")
)
if raw_runtime_ids:
runtime_ids = raw_runtime_ids.split(",")
events: List[TelemetryEvent] = []
for rid in runtime_ids:
events.extend(await self._apmtelemetry_by_runtime_id(rid))
else:
events = await self.apmtelemetry()
return web.json_response(data=events)

async def handle_session_clear(self, request: Request) -> web.Response:
"""Clear traces by session token or all traces if none is provided."""
session_token = request["session_token"]
Expand Down Expand Up @@ -509,14 +577,19 @@ def make_app(
web.put("/v0.5/traces", agent.handle_v05_traces),
web.post("/v0.6/stats", agent.handle_v06_tracestats),
web.put("/v0.6/stats", agent.handle_v06_tracestats),
web.post(
"/telemetry/proxy/api/v2/apmtelemetry", agent.handle_v2_apmtelemetry
),
web.get("/info", agent.handle_info),
web.get("/test/session/start", agent.handle_session_start),
web.get("/test/session/clear", agent.handle_session_clear),
web.get("/test/session/snapshot", agent.handle_snapshot),
web.get("/test/session/traces", agent.handle_session_traces),
web.get("/test/session/apmtelemetry", agent.handle_session_apmtelemetry),
web.get("/test/session/stats", agent.handle_session_tracestats),
web.get("/test/session/requests", agent.handle_session_requests),
web.get("/test/traces", agent.handle_test_traces),
web.get("/test/apmtelemetry", agent.handle_test_apmtelemetry),
# web.get("/test/benchmark", agent.handle_test_traces),
web.get("/test/trace/analyze", agent.handle_trace_analyze),
]
Expand Down
13 changes: 13 additions & 0 deletions ddapm_test_agent/apmtelemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import json
from typing import Any
from typing import Dict
from typing import cast


TelemetryEvent = Dict[str, Any]


def v2_decode(data: bytes) -> TelemetryEvent:
"""Decode v2 apm telemetry request data as a dict"""
# TODO: Handle decoding into a telemetry payload object
return cast(TelemetryEvent, json.loads(data))
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
Add support for ``POST /telemetry/proxy/api/v2/apmtelemetry`` endpoint and tracking telemetry requests.
124 changes: 124 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from ddapm_test_agent.agent import _parse_csv
from ddapm_test_agent.agent import make_app
from ddapm_test_agent.apmtelemetry import TelemetryEvent
from ddapm_test_agent.trace import Trace
from ddapm_test_agent.trace_snapshot import DEFAULT_SNAPSHOT_IGNORES

Expand Down Expand Up @@ -263,3 +264,126 @@ def fn(token: Optional[str] = None) -> Awaitable[Response]:
)

yield fn


@pytest.fixture
def v2_reference_http_apmtelemetry_payload_data_raw():
data = {
"tracer_time": 1658439039,
"runtime_id": "3cac6e9599564813977aace04bf37d57",
"api_version": "v1",
"seq_id": 1,
"application": {
"service_name": "my-svc",
"service_version": "1.0.0",
"env": "prod",
"language_name": "python",
"language_version": "3.9.10",
"tracer_version": "1.3.0",
"runtime_name": "CPython",
"runtime_version": "3.9.10",
},
"host": {
"os": "macOS-12.4",
"hostname": "HELLO-COMPUTER",
"os_version": "12.4",
"kernel_name": "Darwin",
"kernel_release": "21.5.0",
"kernel_version": "Darwin Kernel Version 21.5.0: Tue Apr 26 21:08:22 PDT 2022; root:xnu-8020.121.3~4/RELEASE_X86_64",
"container_id": "",
},
"payload": {
"dependencies": [
{"name": "pyparsing", "version": "3.0.9"},
{"name": "pytest-mock", "version": "3.8.2"},
{"name": "setuptools", "version": "62.6.0"},
{"name": "sortedcontainers", "version": "2.4.0"},
{"name": "attrs", "version": "21.4.0"},
{"name": "wheel", "version": "0.37.1"},
{"name": "protobuf", "version": "4.21.2"},
{"name": "packaging", "version": "21.3"},
{"name": "tomli", "version": "2.0.1"},
{"name": "msgpack", "version": "1.0.4"},
{"name": "bytecode", "version": "0.13.0"},
{"name": "pip", "version": "22.1.2"},
{"name": "py", "version": "1.11.0"},
{"name": "ddsketch", "version": "2.0.3"},
{"name": "coverage", "version": "6.4.2"},
{"name": "pytest-cov", "version": "3.0.0"},
{"name": "iniconfig", "version": "1.1.1"},
{"name": "py-cpuinfo", "version": "8.0.0"},
{"name": "toml", "version": "0.10.2"},
{"name": "pluggy", "version": "1.0.0"},
{"name": "mock", "version": "4.0.3"},
{"name": "six", "version": "1.16.0"},
{"name": "opentracing", "version": "2.4.0"},
{"name": "pytest", "version": "6.2.5"},
{"name": "ddtrace", "version": "1.3.0"},
{"name": "tenacity", "version": "8.0.1"},
{"name": "hypothesis", "version": "6.45.0"},
],
"integrations": [],
"configurations": [],
},
"request_type": "app-started",
}
yield data


@pytest.fixture
def v2_reference_http_apmtelemetry_payload_data(
v2_reference_http_apmtelemetry_payload_data_raw,
):
yield json.dumps(v2_reference_http_apmtelemetry_payload_data_raw)


@pytest.fixture
def v2_reference_http_apmtelemetry_payload_headers( # type: ignore
v2_reference_http_apmtelemetry_payload_data_raw,
) -> Generator[Dict[str, str], None, None]:
headers = {
"Content-type": "application/json",
"DD-Telemetry-Request-Type": v2_reference_http_apmtelemetry_payload_data_raw[
"request_type"
],
"DD-Telemetry-API-Version": "v1",
}
yield headers


def v2_apmtelemetry( # type: ignore
agent,
event: TelemetryEvent,
token: Optional[str] = None,
):
params = {"test_session_token": token} if token is not None else {}
headers = {
"Content-type": "application/json",
"DD-Telemetry-Request-Type": event["request_type"],
"DD-Telemetry-API-Version": "v1",
}

return agent.post(
"/telemetry/proxy/api/v2/apmtelemetry",
params=params,
headers=headers,
data=json.dumps(event),
)


@pytest.fixture
def do_reference_v2_http_apmtelemetry(
agent,
v2_reference_http_apmtelemetry_payload_headers,
v2_reference_http_apmtelemetry_payload_data,
):
def fn(token: Optional[str] = None) -> Awaitable[Response]:
params = {"test_session_token": token} if token is not None else {}
return agent.post( # type: ignore
"/telemetry/proxy/api/v2/apmtelemetry",
params=params,
headers=v2_reference_http_apmtelemetry_payload_headers,
data=v2_reference_http_apmtelemetry_payload_data,
)

yield fn
38 changes: 38 additions & 0 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,46 @@ async def test_info(agent):
"/v0.4/traces",
"/v0.5/traces",
"/v0.6/stats",
"/telemetry/proxy/",
],
"feature_flags": [],
"config": {},
"client_drop_p0s": True,
}


async def test_apmtelemetry(
agent,
v2_reference_http_apmtelemetry_payload_headers,
v2_reference_http_apmtelemetry_payload_data_raw,
v2_reference_http_apmtelemetry_payload_data,
):
resp = await agent.post(
"/telemetry/proxy/api/v2/apmtelemetry",
headers=v2_reference_http_apmtelemetry_payload_headers,
data=v2_reference_http_apmtelemetry_payload_data,
)
assert resp.status == 200, await resp.text()

rid = v2_reference_http_apmtelemetry_payload_data_raw["runtime_id"]
resp = await agent.get("/test/apmtelemetry", params={"runtime_ids": rid})
assert resp.status == 200
assert json.loads(await resp.text()) == [
v2_reference_http_apmtelemetry_payload_data_raw
]

resp = await agent.get("/test/apmtelemetry")
assert resp.status == 200
assert json.loads(await resp.text()) == [
v2_reference_http_apmtelemetry_payload_data_raw
]

resp = await agent.get("/test/session/clear")
assert resp.status == 200

resp = await agent.get(
"/test/apmtelemetry",
params={"runtime_ids": "e81ece6d-7813-47f2-8337-d342f69626bb"},
)
assert resp.status == 200
assert await resp.text() == "[]"
12 changes: 12 additions & 0 deletions tests/test_apmtelemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from ddapm_test_agent.apmtelemetry import v2_decode


def test_decode_v2(
v2_reference_http_apmtelemetry_payload_data_raw,
v2_reference_http_apmtelemetry_payload_data,
):
# decode_v2 is just json.loads for now
assert (
v2_decode(v2_reference_http_apmtelemetry_payload_data)
== v2_reference_http_apmtelemetry_payload_data_raw
)
13 changes: 11 additions & 2 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ async def test_two_sessions(


async def test_session_requests(
agent, do_reference_v04_http_trace, do_reference_v06_http_stats
agent,
do_reference_v04_http_trace,
do_reference_v06_http_stats,
do_reference_v2_http_apmtelemetry,
):
resp = await agent.get(
"/test/session/start", params={"test_session_token": "test_case"}
Expand All @@ -126,13 +129,15 @@ async def test_session_requests(
assert resp.status == 200, await resp.text()
resp = await do_reference_v06_http_stats(token="test_case")
assert resp.status == 200, await resp.text()
resp = await do_reference_v2_http_apmtelemetry(token="test_case")
assert resp.status == 200, await resp.text()

resp = await agent.get(
"/test/session/requests", params={"test_session_token": "test_case"}
)
requests = await resp.json()
assert resp.status == 200
assert len(requests) == 3
assert len(requests) == 4
assert "X-Datadog-Trace-Count" in requests[0]["headers"]
body = requests[0]["body"]
traces = msgpack.unpackb(base64.b64decode(body))
Expand All @@ -142,3 +147,7 @@ async def test_session_requests(
assert requests[0]["url"].endswith("/v0.4/traces?test_session_token=test_case")
assert requests[1]["url"].endswith("/v0.4/traces?test_session_token=test_case")
assert requests[2]["url"].endswith("/v0.6/stats?test_session_token=test_case")
assert requests[3]["method"] == "POST"
assert requests[3]["url"].endswith(
"/telemetry/proxy/api/v2/apmtelemetry?test_session_token=test_case"
)

0 comments on commit 2653859

Please sign in to comment.