Skip to content

Commit

Permalink
Add trace stats snapshot functionality
Browse files Browse the repository at this point in the history
Implement trace stats snapshotting so that trace stats are included in
snapshots.
  • Loading branch information
Kyle-Verhoog committed Mar 31, 2022
1 parent 4318bfd commit 1ea0020
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 9 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ Test session token for a test case. **Ensure this value is unique to avoid confl

### /test/session/snapshot

Perform a snapshot generation or comparison on the data received during the session.

Snapshots are generated when the test agent is not in CI mode and there is no snapshot file present. Otherwise a
snapshot comparison will be performed.


#### [optional\*] `?test_session_token=`
#### [optional\*] `X-Datadog-Test-Session-Token`
To run test cases in parallel this HTTP header must be specified. All test
Expand Down Expand Up @@ -231,6 +237,8 @@ Warning: it is an error to specify both `file` and `dir`.

Note: the file extension will be appended to the filename.

`_tracestats` will be appended to the filename for trace stats requests.


### /test/session/requests

Expand Down
60 changes: 52 additions & 8 deletions ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from . import _get_version
from . import trace_snapshot
from . import tracestats_snapshot
from .checks import CheckTrace
from .checks import Checks
from .checks import start_trace
Expand Down Expand Up @@ -274,6 +275,7 @@ async def handle_session_start(self, request: Request) -> web.Response:
return web.HTTPOk()

async def handle_snapshot(self, request: Request) -> web.Response:
"""Generate a snapshot or perform a snapshot test."""
token = request["session_token"]
snap_dir = request.url.query.get("dir", request.app["snapshot_dir"])
snap_ci_mode = request.app["snapshot_ci_mode"]
Expand Down Expand Up @@ -301,34 +303,76 @@ async def handle_snapshot(self, request: Request) -> web.Response:
else:
snap_file = os.path.join(snap_dir, token)

# The logic from here is mostly duplicated for traces and trace stats.
# If another data type is to be snapshotted then it probably makes sense to abstract away
# the required pieces of snapshotting (loading, generating and comparing).

# For backwards compatibility traces don't have a postfix of `_trace.json`
trace_snap_file = f"{snap_file}.json"
tracestats_snap_file = f"{snap_file}_tracestats.json"

frame.add_item(f"Trace File: {trace_snap_file}")
log.info("using snapshot file %r", trace_snap_file)
frame.add_item(f"Stats File: {tracestats_snap_file}")
log.info(
"using snapshot files %r and %r", trace_snap_file, tracestats_snap_file
)

trace_snap_path_exists = os.path.exists(trace_snap_file)
if snap_ci_mode and not trace_snap_path_exists:

received_traces = await self._traces_by_session(token)
if snap_ci_mode and received_traces and not trace_snap_path_exists:
raise AssertionError(
f"Trace snapshot file '{trace_snap_file}' not found. "
"Perhaps the file was not checked into source control? "
"The snapshot file is automatically generated when the test case is run when not in CI mode."
)
elif trace_snap_path_exists:
# Do the snapshot comparison
received_traces = await self._traces_by_session(token)
with open(trace_snap_file, mode="r") as f:
raw_snapshot = json.load(f)

trace_snapshot.snapshot(
expected_traces=raw_snapshot,
received_traces=received_traces,
ignored=span_ignores,
)
else:
elif received_traces:
# Create a new snapshot for the data received
traces = await self._traces_by_session(token)
with open(trace_snap_file, mode="w") as f:
f.write(trace_snapshot.generate_snapshot(traces))
log.info("wrote new snapshot to %r", os.path.abspath(trace_snap_file))
f.write(trace_snapshot.generate_snapshot(received_traces))
log.info(
"wrote new trace snapshot to %r", os.path.abspath(trace_snap_file)
)

# Get all stats buckets from the payloads since we don't care about the other fields (hostname, env, etc)
# in the payload.
received_stats = [
bucket
for p in (await self._tracestats_by_session(token))
for bucket in p["Stats"]
]
tracestats_snap_path_exists = os.path.exists(tracestats_snap_file)
if snap_ci_mode and received_stats and not tracestats_snap_path_exists:
raise AssertionError(
f"Trace stats snapshot file '{tracestats_snap_file}' not found. "
"Perhaps the file was not checked into source control? "
"The snapshot file is automatically generated when the test case is run when not in CI mode."
)
elif tracestats_snap_path_exists:
# Do the snapshot comparison
with open(tracestats_snap_file, mode="r") as f:
raw_snapshot = json.load(f)
tracestats_snapshot.snapshot(
expected_stats=raw_snapshot,
received_stats=received_stats,
)
elif received_stats:
# Create a new snapshot for the data received
with open(tracestats_snap_file, mode="w") as f:
f.write(tracestats_snapshot.generate(received_stats))
log.info(
"wrote new tracestats snapshot to %r",
os.path.abspath(tracestats_snap_file),
)
return web.HTTPOk()

async def handle_session_traces(self, request: Request) -> web.Response:
Expand Down
83 changes: 83 additions & 0 deletions ddapm_test_agent/tracestats_snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import json
from typing import List

from .checks import CheckTrace
from .tracestats import StatsBucket


def _normalize_statsbuckets(buckets: List[StatsBucket]) -> List[StatsBucket]:
"""Normalize the stats bucket by normalizing the time buckets."""
# Make a copy of the buckets, note that the sketches are not copied.
normed_buckets = []
for bucket in buckets:
bcopy = bucket.copy()
bcopy["Stats"] = [
aggr.copy() for aggr in bucket["Stats"]
] # Copy the aggregations
normed_buckets.append(bcopy)

# Order the buckets by time
normed_buckets = sorted(normed_buckets, key=lambda b: b["Start"])

# Sort aggr for a bucket alphanumerically
for bucket in normed_buckets:
# Sort aggrs by name then resource then hits
bucket["Stats"] = sorted(
bucket["Stats"], key=lambda b: (b["Name"], b["Resource"], ["Hits"])
)

start = normed_buckets[0]["Start"]
for b in normed_buckets:
b["Start"] -= start

return normed_buckets


def snapshot(
expected_stats: List[StatsBucket], received_stats: List[StatsBucket]
) -> None:
# Normalize the stats buckets by making them independent of time. Only ordering matters.

# Sort the buckets by start time.
normed_expected = _normalize_statsbuckets(expected_stats)
normed_received = _normalize_statsbuckets(received_stats)

# TODO: do better matching and comparing to aid in debugging
assert len(normed_received) == len(
normed_expected
), f"Number of stats buckets ({len(normed_received)}) doesn't match expected ({len(normed_expected)})."

with CheckTrace.add_frame(
f"snapshot compare of {len(normed_received)} stats buckets"
):
# Do a really rough comparison.
for i, (exp_bucket, rec_bucket) in enumerate(
zip(normed_expected, normed_received)
):
exp_aggrs = exp_bucket["Stats"]
rec_aggrs = rec_bucket["Stats"]
assert len(exp_aggrs) == len(
rec_aggrs
), f"Number of aggregations ({len(rec_aggrs)}) in bucket {i} doesn't match expected ({len(exp_aggrs)})."

for j, (exp_aggr, rec_aggr) in enumerate(zip(exp_aggrs, rec_aggrs)):
# Omit duration and sketches for now
# Duration and sketches will be noisy
for attr in (
"Name",
"Resource",
"Type",
"Synthetics",
"Hits",
"TopLevelHits",
"Errors",
):
exp_value, rec_value = exp_aggr[attr], rec_aggr[attr] # type: ignore
if exp_value != rec_value:
raise AssertionError(
f"Expected value ('{exp_value}') for field '{attr}' does not match received value '{rec_value}'"
)


def generate(received_stats: List[StatsBucket]) -> str:
return f"{json.dumps(_normalize_statsbuckets(received_stats), indent=2)}\n"
8 changes: 8 additions & 0 deletions releasenotes/notes/trace-stats-snapshot-718ca37385c93e1e.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
features:
- |
Implement trace stats snapshotting.
Trace stats are now included in the snapshot behaviour provided by the testagent.
Similar to traces, trace stats snapshots are output to a json file.
20 changes: 20 additions & 0 deletions tests/integration_snapshots/test_trace_stats_tracestats.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[
{
"Start": 0,
"Duration": 10000000000,
"Stats": [
{
"Name": "http.request",
"Resource": "/users/view",
"Type": null,
"Synthetics": false,
"Hits": 5,
"TopLevelHits": 5,
"Duration": 293000,
"Errors": 1,
"OkSummary": 1046,
"ErrorSummary": 1046
}
]
}
]
122 changes: 122 additions & 0 deletions tests/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import pytest

from ddapm_test_agent import trace_snapshot
from ddapm_test_agent import tracestats_snapshot
from ddapm_test_agent.trace import copy_span
from ddapm_test_agent.trace import set_attr
from ddapm_test_agent.trace import set_meta_tag
from ddapm_test_agent.trace import set_metric_tag
from ddapm_test_agent.tracestats import StatsAggr
from ddapm_test_agent.tracestats import StatsBucket

from .conftest import v04_trace
from .trace_utils import random_trace
Expand Down Expand Up @@ -243,6 +246,88 @@ def test_generate_trace_snapshot(trace, expected):
assert trace_snapshot.generate_snapshot(trace) == expected


@pytest.mark.parametrize(
"buckets,expected",
[
(
[
StatsBucket( # noqa
Start=1000,
Duration=10,
Stats=[
# Not using all the fields of StatsAggr, hence the ignores
StatsAggr( # type: ignore
Name="http.request",
Resource="/users/list",
Hits=10,
TopLevelHits=10,
Duration=100,
), # noqa
StatsAggr( # type: ignore
Name="http.request",
Resource="/users/create",
Hits=5,
TopLevelHits=5,
Duration=10,
), # noqa
],
),
StatsBucket(
Start=1010,
Duration=10,
Stats=[
StatsAggr( # type: ignore
Name="http.request",
Resource="/users/list",
Hits=20,
TopLevelHits=20,
Duration=200,
),
],
),
],
"""[
{
"Start": 0,
"Duration": 10,
"Stats": [
{
"Name": "http.request",
"Resource": "/users/create",
"Hits": 5,
"TopLevelHits": 5,
"Duration": 10
},
{
"Name": "http.request",
"Resource": "/users/list",
"Hits": 10,
"TopLevelHits": 10,
"Duration": 100
}
]
},
{
"Start": 10,
"Duration": 10,
"Stats": [
{
"Name": "http.request",
"Resource": "/users/list",
"Hits": 20,
"TopLevelHits": 20,
"Duration": 200
}
]
}
]\n""",
),
],
)
def test_generate_tracestats_snapshot(buckets, expected):
assert tracestats_snapshot.generate(buckets) == expected


async def test_snapshot_custom_dir(agent, tmp_path, do_reference_v04_http_trace):
resp = await do_reference_v04_http_trace(token="test_case")
assert resp.status == 200
Expand Down Expand Up @@ -278,3 +363,40 @@ async def test_snapshot_custom_file(agent, tmp_path, do_reference_v04_http_trace
assert os.path.exists(custom_file), custom_file
with open(custom_file, mode="r") as f:
assert "".join(f.readlines()) != ""


@pytest.mark.parametrize("snapshot_ci_mode", [False, True])
async def test_snapshot_tracestats(
agent, tmp_path, snapshot_ci_mode, do_reference_v06_http_stats, snapshot_dir
):
resp = await do_reference_v06_http_stats(token="test_case")
assert resp.status == 200

snap_path = snapshot_dir / "test_case_tracestats.json"
resp = await agent.get(
"/test/session/snapshot", params={"test_session_token": "test_case"}
)
resp_clear = await agent.get(
"/test/session/clear", params={"test_session_token": "test_case"}
)
assert resp_clear.status == 200, await resp_clear.text()

if snapshot_ci_mode:
# No previous snapshot file exists so this should fail
assert resp.status == 400
assert f"Trace stats snapshot file '{snap_path}' not found" in await resp.text()
else:
# First invocation the snapshot, file should be created
assert resp.status == 200
assert os.path.exists(snap_path)
with open(snap_path, mode="r") as f:
assert "".join(f.readlines()) != ""

# Do the snapshot again to actually perform a comparison
resp = await do_reference_v06_http_stats(token="test_case")
assert resp.status == 200, await resp.text()

resp = await agent.get(
"/test/session/snapshot", params={"test_session_token": "test_case"}
)
assert resp.status == 200, await resp.text()
Loading

0 comments on commit 1ea0020

Please sign in to comment.