Skip to content

Commit

Permalink
Start of v0.7/traces support to test PHP sidecar
Browse files Browse the repository at this point in the history
  • Loading branch information
bantonsson committed Jun 5, 2024
1 parent aa0a45a commit 3d721a6
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 51 deletions.
78 changes: 32 additions & 46 deletions ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from .trace import TraceMap
from .trace import decode_v04 as trace_decode_v04
from .trace import decode_v05 as trace_decode_v05
from .trace import decode_v07 as trace_decode_v07
from .trace import pprint_trace
from .trace import v04TracePayload
from .trace_checks import CheckMetaTracerVersionHeader
Expand Down Expand Up @@ -126,11 +127,7 @@ async def handle_exception_middleware(request: Request, handler: _Handler) -> we

async def _forward_request(request_data: bytes, headers: Mapping[str, str], full_agent_url: str) -> ClientResponse:
async with ClientSession() as session:
async with session.post(
full_agent_url,
headers=headers,
data=request_data,
) as resp:
async with session.post(full_agent_url, headers=headers, data=request_data,) as resp:
assert resp.status == 200, f"Request to agent unsuccessful, received [{resp.status}] response."

if "text/html" in resp.content_type:
Expand Down Expand Up @@ -166,9 +163,7 @@ async def _prepare_and_send_request(data: bytes, request: Request, headers: Mapp

client_response = await _forward_request(data, headers, full_agent_url)
return web.Response(
status=client_response.status,
headers=client_response.headers,
body=await client_response.read(),
status=client_response.status, headers=client_response.headers, body=await client_response.read(),
)


Expand Down Expand Up @@ -223,6 +218,7 @@ def __init__(self) -> None:
self._forward_endpoints: List[str] = [
"/v0.4/traces",
"/v0.5/traces",
"/v0.7/traces",
"/v0.6/stats",
"/v0.7/config",
"/telemetry/proxy/api/v2/apmtelemetry",
Expand Down Expand Up @@ -397,6 +393,8 @@ async def _traces_from_request(self, req: Request) -> List[List[Span]]:
return self._decode_v04_traces(req)
elif req.match_info.handler == self.handle_v05_traces:
return self._decode_v05_traces(req)
elif req.match_info.handler == self.handle_v07_traces:
return self._decode_v07_traces(req)
return []

async def _traces_by_session(self, token: Optional[str]) -> List[Trace]:
Expand Down Expand Up @@ -456,9 +454,7 @@ async def _tracestats_by_session(self, token: Optional[str]) -> List[v06StatsPay
return stats

async def _integration_requests_by_session(
self,
token: Optional[str],
include_sent_integrations: Optional[bool] = False,
self, token: Optional[str], include_sent_integrations: Optional[bool] = False,
) -> List[Request]:
"""Get all requests with an associated tested Integration."""
integration_requests: List[Request] = []
Expand Down Expand Up @@ -519,6 +515,10 @@ def _decode_v05_traces(self, request: Request) -> v04TracePayload:
raw_data = self._request_data(request)
return trace_decode_v05(raw_data)

def _decode_v07_traces(self, request: Request) -> v04TracePayload:
raw_data = self._request_data(request)
return trace_decode_v07(raw_data)

def _decode_v06_tracestats(self, request: Request) -> v06StatsPayload:
raw_data = self._request_data(request)
return tracestats_decode_v06(raw_data)
Expand All @@ -529,13 +529,14 @@ async def handle_v04_traces(self, request: Request) -> web.Response:
async def handle_v05_traces(self, request: Request) -> web.Response:
return await self._handle_traces(request, version="v0.5")

async def handle_v07_traces(self, request: Request) -> web.Response:
return await self._handle_traces(request, version="v0.7")

async def handle_v06_tracestats(self, request: Request) -> web.Response:
stats = self._decode_v06_tracestats(request)
nstats = len(stats["Stats"])
log.info(
"received /v0.6/stats payload with %r stats bucket%s",
nstats,
"s" if nstats else "",
"received /v0.6/stats payload with %r stats bucket%s", nstats, "s" if nstats else "",
)
return web.HTTPOk()

Expand Down Expand Up @@ -655,6 +656,7 @@ async def handle_info(self, request: Request) -> web.Response:
"endpoints": [
"/v0.4/traces",
"/v0.5/traces",
"/v0.7/traces",
"/v0.6/stats",
"/telemetry/proxy/",
"/v0.7/config",
Expand All @@ -666,7 +668,7 @@ async def handle_info(self, request: Request) -> web.Response:
}
)

async def _handle_traces(self, request: Request, version: Literal["v0.4", "v0.5"]) -> web.Response:
async def _handle_traces(self, request: Request, version: Literal["v0.4", "v0.5", "v0.7"]) -> web.Response:
token = request["session_token"]
checks: Checks = request.app["checks"]
headers = request.headers
Expand All @@ -685,17 +687,15 @@ async def _handle_traces(self, request: Request, version: Literal["v0.4", "v0.5"
traces = self._decode_v04_traces(request)
elif version == "v0.5":
traces = self._decode_v05_traces(request)
elif version == "v0.7":
traces = self._decode_v07_traces(request)
log.info(
"received trace for token %r payload with %r trace chunks",
token,
len(traces),
"received trace for token %r payload with %r trace chunks", token, len(traces),
)
for i, trace in enumerate(traces):
try:
log.info(
"Chunk %d\n%s",
i,
pprint_trace(trace, request.app["log_span_fmt"]),
"Chunk %d\n%s", i, pprint_trace(trace, request.app["log_span_fmt"]),
)
except ValueError:
log.info("Chunk %d could not be displayed (might be incomplete).", i)
Expand All @@ -713,9 +713,7 @@ async def _handle_traces(self, request: Request, version: Literal["v0.4", "v0.5"

with CheckTrace.add_frame(f"payload ({len(traces)} traces)"):
await checks.check(
"trace_count_header",
headers=headers,
num_traces=len(traces),
"trace_count_header", headers=headers, num_traces=len(traces),
)
except MsgPackExtraDataException as e:
log.error(f"Error unpacking trace bytes with Msgpack: {str(e)}, error {e}")
Expand All @@ -736,10 +734,7 @@ async def handle_snapshot(self, request: Request) -> web.Response:
snap_dir = request.url.query.get("dir", request.app["snapshot_dir"])
snap_ci_mode = request.app["snapshot_ci_mode"]
log.info(
"performing snapshot with token=%r, ci_mode=%r and snapshot directory=%r",
token,
snap_ci_mode,
snap_dir,
"performing snapshot with token=%r, ci_mode=%r and snapshot directory=%r", token, snap_ci_mode, snap_dir,
)

# Get the span attributes that are to be ignored for this snapshot.
Expand Down Expand Up @@ -794,9 +789,7 @@ async def handle_snapshot(self, request: Request) -> web.Response:
with open(trace_snap_file, mode="r") as f:
raw_snapshot = json.load(f)
trace_snapshot.snapshot(
expected_traces=raw_snapshot,
received_traces=received_traces,
ignored=span_ignores,
expected_traces=raw_snapshot, received_traces=received_traces, ignored=span_ignores,
)
elif received_traces:
# Create a new snapshot for the data received
Expand All @@ -819,16 +812,14 @@ async def handle_snapshot(self, request: Request) -> web.Response:
with open(tracestats_snap_file, mode="r") as f:
raw_snapshot = json.load(f)
tracestats_snapshot.snapshot(
expected_stats=raw_snapshot,
received_stats=received_stats,
expected_stats=raw_snapshot, received_stats=received_stats,
)
elif received_stats:
# Create a new snapshot for the data received
with open(tracestats_snap_file, mode="w") as f:
f.write(tracestats_snapshot.generate(received_stats))
log.info(
"wrote new tracestats snapshot to %r",
os.path.abspath(tracestats_snap_file),
"wrote new tracestats snapshot to %r", os.path.abspath(tracestats_snap_file),
)
return web.HTTPOk()

Expand Down Expand Up @@ -864,6 +855,7 @@ async def handle_session_requests(self, request: Request) -> web.Response:
if req.match_info.handler not in (
self.handle_v04_traces,
self.handle_v05_traces,
self.handle_v07_traces,
self.handle_v06_tracestats,
self.handle_v01_pipelinestats,
self.handle_v2_apmtelemetry,
Expand Down Expand Up @@ -1091,6 +1083,8 @@ def make_app(
web.put("/v0.4/traces", agent.handle_v04_traces),
web.post("/v0.5/traces", agent.handle_v05_traces),
web.put("/v0.5/traces", agent.handle_v05_traces),
web.post("/v0.7/traces", agent.handle_v07_traces),
web.put("/v0.7/traces", agent.handle_v07_traces),
web.post("/v0.6/stats", agent.handle_v06_tracestats),
web.post("/v0.1/pipeline_stats", agent.handle_v01_pipelinestats),
web.put("/v0.6/stats", agent.handle_v06_tracestats),
Expand Down Expand Up @@ -1150,16 +1144,9 @@ def make_app(
def main(args: Optional[List[str]] = None) -> None:
if args is None:
args = sys.argv[1:]
parser = argparse.ArgumentParser(
description="Datadog APM test agent",
prog="ddapm-test-agent",
)
parser = argparse.ArgumentParser(description="Datadog APM test agent", prog="ddapm-test-agent",)
parser.add_argument(
"-v",
"--version",
action="store_true",
dest="version",
help="Print version info and exit.",
"-v", "--version", action="store_true", dest="version", help="Print version info and exit.",
)
parser.add_argument("-p", "--port", type=int, default=int(os.environ.get("PORT", 8126)))
parser.add_argument(
Expand Down Expand Up @@ -1275,8 +1262,7 @@ def main(args: Optional[List[str]] = None) -> None:

if parsed_args.trace_request_delay is not None:
log.info(
"Trace request stall seconds setting set to %r.",
parsed_args.trace_request_delay,
"Trace request stall seconds setting set to %r.", parsed_args.trace_request_delay,
)
if not os.path.exists(parsed_args.snapshot_dir) or not os.access(parsed_args.snapshot_dir, os.W_OK | os.X_OK):
log.warning(
Expand Down
63 changes: 58 additions & 5 deletions ddapm_test_agent/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class Span(TypedDict):
v04TracePayload = List[List[Span]]
TraceMap = OrderedDict[int, Trace]


# TODO:ban add extra tags to add to the span
# TODO:ban warn about dropping metastruct
def verify_span(d: Any) -> Span:
try:
assert isinstance(d, dict), f"Expected 'span' to be of type: '{dict}', got: '{type(d)}'"
Expand Down Expand Up @@ -239,10 +240,7 @@ def dfs_order_with_depth(trace: Trace) -> Generator[Tuple[Span, int], None, None
children = [(_, depth) for _ in cmap[c["span_id"]]] + children


def pprint_trace(
trace: Trace,
fmt: Union[str, Callable[[Span], str]],
) -> str:
def pprint_trace(trace: Trace, fmt: Union[str, Callable[[Span], str]],) -> str:
cmap = child_map(trace)
stack: List[Tuple[str, str, Span]] = [("", "", root_span(trace))]
s = ""
Expand Down Expand Up @@ -447,3 +445,58 @@ def decode_v05(data: bytes) -> v04TracePayload:
trace.append(span)
traces.append(trace)
return traces


def decode_v07(data: bytes) -> v04TracePayload:
""" Decode a v07 trace payload.
The v07 format is almost the same as the v04 format, but the there is some
extra structure to the payload.
These are the types of the payload:
class TracerPayloadV07(TypedDict):
container_id: NotRequired[str]
language_name: NotRequired[str]
language_version: NotRequired[str]
tracer_version: NotRequired[str]
runtime_id: NotRequired[str]
chunks: List[TraceChunkV07]
tags: NotRequired[Dict[str, str]]
env: NotRequired[str]
hostname: NotRequired[str]
app_version: NotRequired[str]
class TraceChunkV07(TypedDict):
priority: int
origin: str
spans: List[Span]
tags: NotRequired[Dict[str, str]]
droppedTrace: NotRequired[bool]
"""
payload = msgpack.unpackb(data)
return _verify_v07_payload(payload)


def _verify_v07_payload(data: Any) -> v04TracePayload:
if not isinstance(data, dict):
raise TypeError("Trace payload must be a map, got type %r." % type(data))
if "chunks" not in data:
raise TypeError("Trace payload must contain a 'chunks' key.")
if not isinstance(data["chunks"], list):
raise TypeError("Trace payload 'chunks' must be a list.")
# TODO:ban pull out the tags and other things that should be applied to all spans
traces: List[List[Span]] = []
for chunk in data["chunks"]:
traces.append(_verify_v07_chunk(chunk))
return cast(v04TracePayload, data)


def _verify_v07_chunk(chunk: Any) -> List[Span]:
if not isinstance(chunk, dict):
raise TypeError("Chunk must be a map.")
if "spans" not in chunk:
raise TypeError("Chunk must contain a 'spans' key.")
if not isinstance(chunk["spans"], list):
raise TypeError("Chunk 'spans' must be a list.")
# TODO:ban pull out the tags and other things that should be applied to all spans
return v04_verify_trace(chunk["spans"])

0 comments on commit 3d721a6

Please sign in to comment.