From 3d721a628dffd78ba28803df69b3994c7e0ee1dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bjo=CC=88rn=20Antonsson?= Date: Wed, 5 Jun 2024 16:57:01 +0200 Subject: [PATCH] Start of v0.7/traces support to test PHP sidecar --- ddapm_test_agent/agent.py | 78 ++++++++++++++++----------------------- ddapm_test_agent/trace.py | 63 ++++++++++++++++++++++++++++--- 2 files changed, 90 insertions(+), 51 deletions(-) diff --git a/ddapm_test_agent/agent.py b/ddapm_test_agent/agent.py index c7c4de41..6cfe7308 100644 --- a/ddapm_test_agent/agent.py +++ b/ddapm_test_agent/agent.py @@ -49,6 +49,7 @@ from .trace import TraceMap from .trace import decode_v04 as trace_decode_v04 from .trace import decode_v05 as trace_decode_v05 +from .trace import decode_v07 as trace_decode_v07 from .trace import pprint_trace from .trace import v04TracePayload from .trace_checks import CheckMetaTracerVersionHeader @@ -126,11 +127,7 @@ async def handle_exception_middleware(request: Request, handler: _Handler) -> we async def _forward_request(request_data: bytes, headers: Mapping[str, str], full_agent_url: str) -> ClientResponse: async with ClientSession() as session: - async with session.post( - full_agent_url, - headers=headers, - data=request_data, - ) as resp: + async with session.post(full_agent_url, headers=headers, data=request_data,) as resp: assert resp.status == 200, f"Request to agent unsuccessful, received [{resp.status}] response." if "text/html" in resp.content_type: @@ -166,9 +163,7 @@ async def _prepare_and_send_request(data: bytes, request: Request, headers: Mapp client_response = await _forward_request(data, headers, full_agent_url) return web.Response( - status=client_response.status, - headers=client_response.headers, - body=await client_response.read(), + status=client_response.status, headers=client_response.headers, body=await client_response.read(), ) @@ -223,6 +218,7 @@ def __init__(self) -> None: self._forward_endpoints: List[str] = [ "/v0.4/traces", "/v0.5/traces", + "/v0.7/traces", "/v0.6/stats", "/v0.7/config", "/telemetry/proxy/api/v2/apmtelemetry", @@ -397,6 +393,8 @@ async def _traces_from_request(self, req: Request) -> List[List[Span]]: return self._decode_v04_traces(req) elif req.match_info.handler == self.handle_v05_traces: return self._decode_v05_traces(req) + elif req.match_info.handler == self.handle_v07_traces: + return self._decode_v07_traces(req) return [] async def _traces_by_session(self, token: Optional[str]) -> List[Trace]: @@ -456,9 +454,7 @@ async def _tracestats_by_session(self, token: Optional[str]) -> List[v06StatsPay return stats async def _integration_requests_by_session( - self, - token: Optional[str], - include_sent_integrations: Optional[bool] = False, + self, token: Optional[str], include_sent_integrations: Optional[bool] = False, ) -> List[Request]: """Get all requests with an associated tested Integration.""" integration_requests: List[Request] = [] @@ -519,6 +515,10 @@ def _decode_v05_traces(self, request: Request) -> v04TracePayload: raw_data = self._request_data(request) return trace_decode_v05(raw_data) + def _decode_v07_traces(self, request: Request) -> v04TracePayload: + raw_data = self._request_data(request) + return trace_decode_v07(raw_data) + def _decode_v06_tracestats(self, request: Request) -> v06StatsPayload: raw_data = self._request_data(request) return tracestats_decode_v06(raw_data) @@ -529,13 +529,14 @@ async def handle_v04_traces(self, request: Request) -> web.Response: async def handle_v05_traces(self, request: Request) -> web.Response: return await self._handle_traces(request, version="v0.5") + async def handle_v07_traces(self, request: Request) -> web.Response: + return await self._handle_traces(request, version="v0.7") + async def handle_v06_tracestats(self, request: Request) -> web.Response: stats = self._decode_v06_tracestats(request) nstats = len(stats["Stats"]) log.info( - "received /v0.6/stats payload with %r stats bucket%s", - nstats, - "s" if nstats else "", + "received /v0.6/stats payload with %r stats bucket%s", nstats, "s" if nstats else "", ) return web.HTTPOk() @@ -655,6 +656,7 @@ async def handle_info(self, request: Request) -> web.Response: "endpoints": [ "/v0.4/traces", "/v0.5/traces", + "/v0.7/traces", "/v0.6/stats", "/telemetry/proxy/", "/v0.7/config", @@ -666,7 +668,7 @@ async def handle_info(self, request: Request) -> web.Response: } ) - async def _handle_traces(self, request: Request, version: Literal["v0.4", "v0.5"]) -> web.Response: + async def _handle_traces(self, request: Request, version: Literal["v0.4", "v0.5", "v0.7"]) -> web.Response: token = request["session_token"] checks: Checks = request.app["checks"] headers = request.headers @@ -685,17 +687,15 @@ async def _handle_traces(self, request: Request, version: Literal["v0.4", "v0.5" traces = self._decode_v04_traces(request) elif version == "v0.5": traces = self._decode_v05_traces(request) + elif version == "v0.7": + traces = self._decode_v07_traces(request) log.info( - "received trace for token %r payload with %r trace chunks", - token, - len(traces), + "received trace for token %r payload with %r trace chunks", token, len(traces), ) for i, trace in enumerate(traces): try: log.info( - "Chunk %d\n%s", - i, - pprint_trace(trace, request.app["log_span_fmt"]), + "Chunk %d\n%s", i, pprint_trace(trace, request.app["log_span_fmt"]), ) except ValueError: log.info("Chunk %d could not be displayed (might be incomplete).", i) @@ -713,9 +713,7 @@ async def _handle_traces(self, request: Request, version: Literal["v0.4", "v0.5" with CheckTrace.add_frame(f"payload ({len(traces)} traces)"): await checks.check( - "trace_count_header", - headers=headers, - num_traces=len(traces), + "trace_count_header", headers=headers, num_traces=len(traces), ) except MsgPackExtraDataException as e: log.error(f"Error unpacking trace bytes with Msgpack: {str(e)}, error {e}") @@ -736,10 +734,7 @@ async def handle_snapshot(self, request: Request) -> web.Response: snap_dir = request.url.query.get("dir", request.app["snapshot_dir"]) snap_ci_mode = request.app["snapshot_ci_mode"] log.info( - "performing snapshot with token=%r, ci_mode=%r and snapshot directory=%r", - token, - snap_ci_mode, - snap_dir, + "performing snapshot with token=%r, ci_mode=%r and snapshot directory=%r", token, snap_ci_mode, snap_dir, ) # Get the span attributes that are to be ignored for this snapshot. @@ -794,9 +789,7 @@ async def handle_snapshot(self, request: Request) -> web.Response: with open(trace_snap_file, mode="r") as f: raw_snapshot = json.load(f) trace_snapshot.snapshot( - expected_traces=raw_snapshot, - received_traces=received_traces, - ignored=span_ignores, + expected_traces=raw_snapshot, received_traces=received_traces, ignored=span_ignores, ) elif received_traces: # Create a new snapshot for the data received @@ -819,16 +812,14 @@ async def handle_snapshot(self, request: Request) -> web.Response: with open(tracestats_snap_file, mode="r") as f: raw_snapshot = json.load(f) tracestats_snapshot.snapshot( - expected_stats=raw_snapshot, - received_stats=received_stats, + expected_stats=raw_snapshot, received_stats=received_stats, ) elif received_stats: # Create a new snapshot for the data received with open(tracestats_snap_file, mode="w") as f: f.write(tracestats_snapshot.generate(received_stats)) log.info( - "wrote new tracestats snapshot to %r", - os.path.abspath(tracestats_snap_file), + "wrote new tracestats snapshot to %r", os.path.abspath(tracestats_snap_file), ) return web.HTTPOk() @@ -864,6 +855,7 @@ async def handle_session_requests(self, request: Request) -> web.Response: if req.match_info.handler not in ( self.handle_v04_traces, self.handle_v05_traces, + self.handle_v07_traces, self.handle_v06_tracestats, self.handle_v01_pipelinestats, self.handle_v2_apmtelemetry, @@ -1091,6 +1083,8 @@ def make_app( web.put("/v0.4/traces", agent.handle_v04_traces), web.post("/v0.5/traces", agent.handle_v05_traces), web.put("/v0.5/traces", agent.handle_v05_traces), + web.post("/v0.7/traces", agent.handle_v07_traces), + web.put("/v0.7/traces", agent.handle_v07_traces), web.post("/v0.6/stats", agent.handle_v06_tracestats), web.post("/v0.1/pipeline_stats", agent.handle_v01_pipelinestats), web.put("/v0.6/stats", agent.handle_v06_tracestats), @@ -1150,16 +1144,9 @@ def make_app( def main(args: Optional[List[str]] = None) -> None: if args is None: args = sys.argv[1:] - parser = argparse.ArgumentParser( - description="Datadog APM test agent", - prog="ddapm-test-agent", - ) + parser = argparse.ArgumentParser(description="Datadog APM test agent", prog="ddapm-test-agent",) parser.add_argument( - "-v", - "--version", - action="store_true", - dest="version", - help="Print version info and exit.", + "-v", "--version", action="store_true", dest="version", help="Print version info and exit.", ) parser.add_argument("-p", "--port", type=int, default=int(os.environ.get("PORT", 8126))) parser.add_argument( @@ -1275,8 +1262,7 @@ def main(args: Optional[List[str]] = None) -> None: if parsed_args.trace_request_delay is not None: log.info( - "Trace request stall seconds setting set to %r.", - parsed_args.trace_request_delay, + "Trace request stall seconds setting set to %r.", parsed_args.trace_request_delay, ) if not os.path.exists(parsed_args.snapshot_dir) or not os.access(parsed_args.snapshot_dir, os.W_OK | os.X_OK): log.warning( diff --git a/ddapm_test_agent/trace.py b/ddapm_test_agent/trace.py index 10ec21da..6feab162 100644 --- a/ddapm_test_agent/trace.py +++ b/ddapm_test_agent/trace.py @@ -93,7 +93,8 @@ class Span(TypedDict): v04TracePayload = List[List[Span]] TraceMap = OrderedDict[int, Trace] - +# TODO:ban add extra tags to add to the span +# TODO:ban warn about dropping metastruct def verify_span(d: Any) -> Span: try: assert isinstance(d, dict), f"Expected 'span' to be of type: '{dict}', got: '{type(d)}'" @@ -239,10 +240,7 @@ def dfs_order_with_depth(trace: Trace) -> Generator[Tuple[Span, int], None, None children = [(_, depth) for _ in cmap[c["span_id"]]] + children -def pprint_trace( - trace: Trace, - fmt: Union[str, Callable[[Span], str]], -) -> str: +def pprint_trace(trace: Trace, fmt: Union[str, Callable[[Span], str]],) -> str: cmap = child_map(trace) stack: List[Tuple[str, str, Span]] = [("", "", root_span(trace))] s = "" @@ -447,3 +445,58 @@ def decode_v05(data: bytes) -> v04TracePayload: trace.append(span) traces.append(trace) return traces + + +def decode_v07(data: bytes) -> v04TracePayload: + """ Decode a v07 trace payload. + The v07 format is almost the same as the v04 format, but the there is some + extra structure to the payload. + + These are the types of the payload: + + class TracerPayloadV07(TypedDict): + container_id: NotRequired[str] + language_name: NotRequired[str] + language_version: NotRequired[str] + tracer_version: NotRequired[str] + runtime_id: NotRequired[str] + chunks: List[TraceChunkV07] + tags: NotRequired[Dict[str, str]] + env: NotRequired[str] + hostname: NotRequired[str] + app_version: NotRequired[str] + + class TraceChunkV07(TypedDict): + priority: int + origin: str + spans: List[Span] + tags: NotRequired[Dict[str, str]] + droppedTrace: NotRequired[bool] + """ + payload = msgpack.unpackb(data) + return _verify_v07_payload(payload) + + +def _verify_v07_payload(data: Any) -> v04TracePayload: + if not isinstance(data, dict): + raise TypeError("Trace payload must be a map, got type %r." % type(data)) + if "chunks" not in data: + raise TypeError("Trace payload must contain a 'chunks' key.") + if not isinstance(data["chunks"], list): + raise TypeError("Trace payload 'chunks' must be a list.") + # TODO:ban pull out the tags and other things that should be applied to all spans + traces: List[List[Span]] = [] + for chunk in data["chunks"]: + traces.append(_verify_v07_chunk(chunk)) + return cast(v04TracePayload, data) + + +def _verify_v07_chunk(chunk: Any) -> List[Span]: + if not isinstance(chunk, dict): + raise TypeError("Chunk must be a map.") + if "spans" not in chunk: + raise TypeError("Chunk must contain a 'spans' key.") + if not isinstance(chunk["spans"], list): + raise TypeError("Chunk 'spans' must be a list.") + # TODO:ban pull out the tags and other things that should be applied to all spans + return v04_verify_trace(chunk["spans"])