Skip to content

Commit

Permalink
Support for v0.7 traces (#181)
Browse files Browse the repository at this point in the history
Add support for v0.7 traces.

---------

Co-authored-by: Bob Weinand <bob.weinand@datadoghq.com>
Co-authored-by: Julio Gonzalez <julio.gonzalez@datadoghq.com>
Co-authored-by: Kyle Verhoog <kyle@verhoog.ca>
  • Loading branch information
4 people authored Jun 7, 2024
1 parent aa0a45a commit 50c8fba
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 11 deletions.
41 changes: 30 additions & 11 deletions ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from .trace import TraceMap
from .trace import decode_v04 as trace_decode_v04
from .trace import decode_v05 as trace_decode_v05
from .trace import decode_v07 as trace_decode_v07
from .trace import pprint_trace
from .trace import v04TracePayload
from .trace_checks import CheckMetaTracerVersionHeader
Expand Down Expand Up @@ -124,7 +125,9 @@ async def handle_exception_middleware(request: Request, handler: _Handler) -> we
return web.HTTPBadRequest(reason=str(e))


async def _forward_request(request_data: bytes, headers: Mapping[str, str], full_agent_url: str) -> ClientResponse:
async def _forward_request(
request_data: bytes, headers: Mapping[str, str], full_agent_url: str
) -> tuple[ClientResponse, str]:
async with ClientSession() as session:
async with session.post(
full_agent_url,
Expand All @@ -133,25 +136,25 @@ async def _forward_request(request_data: bytes, headers: Mapping[str, str], full
) as resp:
assert resp.status == 200, f"Request to agent unsuccessful, received [{resp.status}] response."

if "text/html" in resp.content_type:
if "text/plain" in resp.content_type:
response_data = await resp.text()
log.info("Response %r from agent:", response_data)
else:
raw_response_data = await resp.read()
if len(raw_response_data) == 0:
log.info("Received empty response: %r from agent.", raw_response_data)
response_data = ""
else:
if isinstance(raw_response_data, bytes):
response_data = raw_response_data.decode()
try:
response_data = json.loads(raw_response_data)
response_data = json.dumps(json.loads(raw_response_data))
except json.JSONDecodeError as e:
log.warning("Error decoding response data: %s, data=%r", str(e), response_data)
log.warning("Original Request: %r", request_data)
response_data = ""
log.info("Response %r from agent:", response_data)
elif "text/plain" in resp.content_type:
log.info("Response %r from agent:", await resp.text())
else:
log.info("Response %r from agent:", await resp.json())
return resp
return resp, response_data


async def _prepare_and_send_request(data: bytes, request: Request, headers: Mapping[str, str]) -> web.Response:
Expand All @@ -164,11 +167,11 @@ async def _prepare_and_send_request(data: bytes, request: Request, headers: Mapp
log.info("Forwarding request to agent at %r", full_agent_url)
log.debug(f"Using headers: {headers}")

client_response = await _forward_request(data, headers, full_agent_url)
(client_response, body) = await _forward_request(data, headers, full_agent_url)
return web.Response(
status=client_response.status,
headers=client_response.headers,
body=await client_response.read(),
body=body,
)


Expand Down Expand Up @@ -223,6 +226,7 @@ def __init__(self) -> None:
self._forward_endpoints: List[str] = [
"/v0.4/traces",
"/v0.5/traces",
"/v0.7/traces",
"/v0.6/stats",
"/v0.7/config",
"/telemetry/proxy/api/v2/apmtelemetry",
Expand Down Expand Up @@ -397,6 +401,8 @@ async def _traces_from_request(self, req: Request) -> List[List[Span]]:
return self._decode_v04_traces(req)
elif req.match_info.handler == self.handle_v05_traces:
return self._decode_v05_traces(req)
elif req.match_info.handler == self.handle_v07_traces:
return self._decode_v07_traces(req)
return []

async def _traces_by_session(self, token: Optional[str]) -> List[Trace]:
Expand Down Expand Up @@ -519,6 +525,10 @@ def _decode_v05_traces(self, request: Request) -> v04TracePayload:
raw_data = self._request_data(request)
return trace_decode_v05(raw_data)

def _decode_v07_traces(self, request: Request) -> v04TracePayload:
raw_data = self._request_data(request)
return trace_decode_v07(raw_data)

def _decode_v06_tracestats(self, request: Request) -> v06StatsPayload:
raw_data = self._request_data(request)
return tracestats_decode_v06(raw_data)
Expand All @@ -529,6 +539,9 @@ async def handle_v04_traces(self, request: Request) -> web.Response:
async def handle_v05_traces(self, request: Request) -> web.Response:
return await self._handle_traces(request, version="v0.5")

async def handle_v07_traces(self, request: Request) -> web.Response:
return await self._handle_traces(request, version="v0.7")

async def handle_v06_tracestats(self, request: Request) -> web.Response:
stats = self._decode_v06_tracestats(request)
nstats = len(stats["Stats"])
Expand Down Expand Up @@ -655,6 +668,7 @@ async def handle_info(self, request: Request) -> web.Response:
"endpoints": [
"/v0.4/traces",
"/v0.5/traces",
"/v0.7/traces",
"/v0.6/stats",
"/telemetry/proxy/",
"/v0.7/config",
Expand All @@ -666,7 +680,7 @@ async def handle_info(self, request: Request) -> web.Response:
}
)

async def _handle_traces(self, request: Request, version: Literal["v0.4", "v0.5"]) -> web.Response:
async def _handle_traces(self, request: Request, version: Literal["v0.4", "v0.5", "v0.7"]) -> web.Response:
token = request["session_token"]
checks: Checks = request.app["checks"]
headers = request.headers
Expand All @@ -685,6 +699,8 @@ async def _handle_traces(self, request: Request, version: Literal["v0.4", "v0.5"
traces = self._decode_v04_traces(request)
elif version == "v0.5":
traces = self._decode_v05_traces(request)
elif version == "v0.7":
traces = self._decode_v07_traces(request)
log.info(
"received trace for token %r payload with %r trace chunks",
token,
Expand Down Expand Up @@ -864,6 +880,7 @@ async def handle_session_requests(self, request: Request) -> web.Response:
if req.match_info.handler not in (
self.handle_v04_traces,
self.handle_v05_traces,
self.handle_v07_traces,
self.handle_v06_tracestats,
self.handle_v01_pipelinestats,
self.handle_v2_apmtelemetry,
Expand Down Expand Up @@ -1091,6 +1108,8 @@ def make_app(
web.put("/v0.4/traces", agent.handle_v04_traces),
web.post("/v0.5/traces", agent.handle_v05_traces),
web.put("/v0.5/traces", agent.handle_v05_traces),
web.post("/v0.7/traces", agent.handle_v07_traces),
web.put("/v0.7/traces", agent.handle_v07_traces),
web.post("/v0.6/stats", agent.handle_v06_tracestats),
web.post("/v0.1/pipeline_stats", agent.handle_v01_pipelinestats),
web.put("/v0.6/stats", agent.handle_v06_tracestats),
Expand Down
57 changes: 57 additions & 0 deletions ddapm_test_agent/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class Span(TypedDict):
TraceMap = OrderedDict[int, Trace]


# TODO:ban add extra tags to add to the span
# TODO:ban warn about dropping metastruct
def verify_span(d: Any) -> Span:
try:
assert isinstance(d, dict), f"Expected 'span' to be of type: '{dict}', got: '{type(d)}'"
Expand Down Expand Up @@ -447,3 +449,58 @@ def decode_v05(data: bytes) -> v04TracePayload:
trace.append(span)
traces.append(trace)
return traces


def decode_v07(data: bytes) -> v04TracePayload:
"""Decode a v07 trace payload.
The v07 format is almost the same as the v04 format, but the there is some
extra structure to the payload.
These are the types of the payload:
class TracerPayloadV07(TypedDict):
container_id: NotRequired[str]
language_name: NotRequired[str]
language_version: NotRequired[str]
tracer_version: NotRequired[str]
runtime_id: NotRequired[str]
chunks: List[TraceChunkV07]
tags: NotRequired[Dict[str, str]]
env: NotRequired[str]
hostname: NotRequired[str]
app_version: NotRequired[str]
class TraceChunkV07(TypedDict):
priority: int
origin: str
spans: List[Span]
tags: NotRequired[Dict[str, str]]
droppedTrace: NotRequired[bool]
"""
payload = msgpack.unpackb(data)
return _verify_v07_payload(payload)


def _verify_v07_payload(data: Any) -> v04TracePayload:
if not isinstance(data, dict):
raise TypeError("Trace payload must be a map, got type %r." % type(data))
if "chunks" not in data:
raise TypeError("Trace payload must contain a 'chunks' key.")
if not isinstance(data["chunks"], list):
raise TypeError("Trace payload 'chunks' must be a list.")
# TODO:ban pull out the tags and other things that should be applied to all spans
traces: List[List[Span]] = []
for chunk in data["chunks"]:
traces.append(_verify_v07_chunk(chunk))
return cast(v04TracePayload, traces)


def _verify_v07_chunk(chunk: Any) -> List[Span]:
if not isinstance(chunk, dict):
raise TypeError("Chunk must be a map.")
if "spans" not in chunk:
raise TypeError("Chunk must contain a 'spans' key.")
if not isinstance(chunk["spans"], list):
raise TypeError("Chunk 'spans' must be a list.")
# TODO:ban pull out the tags and other things that should be applied to all spans
return v04_verify_trace(chunk["spans"])
4 changes: 4 additions & 0 deletions releasenotes/notes/v0.7-traces-7ea3ebbed7502d01.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
Add support for v0.7 traces.

0 comments on commit 50c8fba

Please sign in to comment.