Skip to content

Commit

Permalink
Merge branch 'master' into add-service-naming-checks-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
wconti27 committed Jul 12, 2023
2 parents 4fab7a1 + b6fa77d commit 7c1fe91
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 88 deletions.
47 changes: 45 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Please refer to `ddapm-test-agent-fmt --help` for more information.

- `PORT` [`8126`]: Port to listen on.

- `DISABLED_CHECKS` [`""`]: Comma-separated values of checks to disable.
- `ENABLED_CHECKS` [`""`]: Comma-separated values of checks to enable. Valid values can be found in [trace invariant checks](#Trace-invariant-checks)

- `LOG_LEVEL` [`"INFO"`]: Log level to use. DEBUG, INFO, WARNING, ERROR, CRITICAL.

Expand Down Expand Up @@ -325,13 +325,56 @@ The keys of the JSON body are `path` and `msg`
curl -X POST 'http://0.0.0.0:8126/test/session/responses/config/path' -d '{"path": "datadog/2/ASM_DATA/blocked_users/config", "msg": {"rules_data": []}}'
```


## /test/trace_check/failures (GET)
Get any Trace Check failures that occured. Returns a `<Response 200>` if no Trace Check failures occurred, and a `<Response 400>` with the Trace Check Failure messages included in the response body. To be used in combination with `DD_POOL_TRACE_CHECK_FAILURES`, or else failures will not be saved within Test-Agent memory and a `<Response 200>` will always be returned.
Get Trace Check failures that occured. If a token is included, trace failures for only that session token are returned unless used in conjuction with `return_all`, which can be used to return all failures regardless of inputted token. This method returns a `<Response 200>` if no Trace Check failures are being returned and a `<Response 400>` if Trace Check failures are being returned. Trace Check failures are returned as a content type of text, with failure messages concatenated in the response body. Optionally, set the `use_json` query string parameter to `true` to return Trace Check failures as a JSON response in the following format:
```json
response = {
"<FAILING_CHECK_NAME>" : ["<FAILURE_MESSAGE_1>", "<FAILURE_MESSAGE_2>"]
}
```

NOTE: To be used in combination with `DD_POOL_TRACE_CHECK_FAILURES`, or else failures will not be saved within Test-Agent memory and a `<Response 200>` will always be returned.

#### [optional] `?test_session_token=`
#### [optional] `X-Datadog-Test-Session-Token`
#### [optional] `?use_json=`
#### [optional] `?return_all=`

```
curl -X GET 'http://0.0.0.0:8126/test/trace_check/failures'
```

## /test/trace_check/clear (GET)
Clear Trace Check failures that occured. If a token is included, trace failures for only that session token are cleared unless used in conjuction with `clear_all`. This argument can be used to clear all failures (regardless of inputted session token).

#### [optional] `?test_session_token=`
#### [optional] `X-Datadog-Test-Session-Token`
#### [optional] `?clear_all=`

```
curl -X GET 'http://0.0.0.0:8126/test/trace_check/clear'
```

## /test/trace_check/summary (GET)
Get Trace Check summary results. If a token is included, returns summary results only for Trace Checks run during the session. The `return_all` optional query string parameter can be used to return all trace check results (regardless of inputted session token). The method returns Trace Check results in the following JSON format:
```json
summary = {
"trace_content_length" : {
"Passed_Checks": 10,
"Failed_Checks": 0,
"Skipped_Checks": 4,
} ...
}
```

#### [optional] `?test_session_token=`
#### [optional] `X-Datadog-Test-Session-Token`
#### [optional] `?return_all=`

```
curl -X GET 'http://0.0.0.0:8126/test/trace_check/summary'
### /v0.1/pipeline_stats
Mimics the pipeline_stats endpoint of the agent, but always returns OK, and logs a line everytime it's called.
Expand Down
139 changes: 105 additions & 34 deletions ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Literal
from typing import Optional
from typing import Set
from typing import Tuple
from typing import cast
from urllib.parse import urlparse
from urllib.parse import urlunparse
Expand Down Expand Up @@ -152,6 +153,22 @@ def update_trace_agent_port(url, new_port):
return new_url


def default_value_trace_check_results_by_check():
return defaultdict(default_value_trace_results_summary)


def default_value_trace_failures():
return []


def default_value_trace_results_summary():
return {
"Passed_Checks": 0,
"Failed_Checks": 0,
"Skipped_Checks": 0,
}


class Agent:
def __init__(self):
"""Only store the requests sent to the agent. There are many representations
Expand All @@ -163,8 +180,10 @@ def __init__(self):
# Token to be used if running test cases synchronously
self._requests: List[Request] = []
self._rc_server = RemoteConfigServer()
self._trace_failures: Dict[str, List[str]] = defaultdict(list)
self._trace_check_results_by_check: Dict[str, Dict[str, int]] = {}
self._trace_failures: Dict[str, List[Tuple[CheckTrace, str]]] = defaultdict(default_value_trace_failures)
self._trace_check_results_by_check: Dict[str, Dict[str, Dict[str, int]]] = defaultdict(
default_value_trace_check_results_by_check
)
self._forward_endpoints: List[str] = [
"/v0.4/traces",
"/v0.5/traces",
Expand Down Expand Up @@ -193,36 +212,76 @@ async def traces(self) -> TraceMap:
return _traces

async def clear_trace_check_failures(self, request: Request) -> web.Response:
"""Clear traces by session token or all traces if none is provided."""
token = request.get("session_token", "default")

trace_failures = self._trace_failures.get(token, [])
log.info(f"Clearing {len(trace_failures)} Trace Check Failures for Token {token}")
"""Clear traces by session token provided."""
token = request["session_token"]
clear_all = "clear_all" in request.query and request.query["clear_all"].lower() == "true"
if clear_all:
failures_by_token = self._trace_failures
trace_failures = [value for sublist in failures_by_token.values() for value in sublist]
self._trace_failures = defaultdict(default_value_trace_failures)
self._trace_check_results_by_check = defaultdict(default_value_trace_check_results_by_check)
else:
trace_failures = self._trace_failures[token]
del self._trace_failures[token]
del self._trace_check_results_by_check[token]
log.info(f"Clearing {len(trace_failures)} Trace Check Failures for Token {token}, clear_all={clear_all}")
log.info(trace_failures)
del self._trace_failures[token]
self._trace_check_results_by_check = {}
return web.HTTPOk()

def get_trace_check_failures(self, request: Request) -> web.Response:
"""Return the Trace Check failures that occurred, if pooling is enabled as a request."""
token = request.get("session_token", "default")

trace_check_failures = self._trace_failures.get(token, [])
n_failures = len(trace_check_failures)
log.info(f"{n_failures} Trace Failures Occurred for Token {token}.")
trace_check_full_results = self._trace_check_results_by_check
"""Return the Trace Check failures that occurred, if pooling is enabled,
returned as either a Text (by default) or JSON response.
"""
token = request["session_token"]
return_all = "return_all" in request.query and request.query["return_all"].lower() == "true"

if return_all:
# check for whether to return all results
trace_check_failures = []
for f in self._trace_failures.values():
trace_check_failures.extend(f)
n_failures = len(trace_check_failures)
log.info(f"{n_failures} Trace Failures Occurred in Total")
else:
# or return results by token
trace_check_failures = self._trace_failures.get(token, [])
n_failures = len(trace_check_failures)
log.info(f"{n_failures} Trace Failures Occurred for Token {token}")
if n_failures > 0:
failure_message = f"APM Test Agent Validation failed with {n_failures} Trace Check failures.\n"
for trace_check_message in trace_check_failures:
failure_message += trace_check_message
failure_message += f"\nAPM Test Agent Trace Check Results by Check -------------------------------"
failure_message += f"\n{json.dumps(trace_check_full_results, indent=4)}"
return web.HTTPBadRequest(text=failure_message)
if "use_json" in request.query and request.query["use_json"].lower() == "true":
# check what response type to use
results: Dict[str, List[str]] = {}
for check_trace, failure_message in trace_check_failures:
results = check_trace.get_failures_by_check(results)
json_summary = json.dumps(results)
return web.HTTPBadRequest(body=json_summary, content_type="application/json")
else:
# or use default response of text
msg = f"APM Test Agent Validation failed with {n_failures} Trace Check failures.\n"
for check_trace, failure_message in trace_check_failures:
msg += failure_message
return web.HTTPBadRequest(text=msg)
else:
message = f"APM Test Agent Trace Check Results by Check --------------------------------"
message += f"\n{json.dumps(trace_check_full_results, indent=4)}"
return web.HTTPOk(text=message)

def get_trace_check_summary(self, request: Request) -> web.Response:
token = request["session_token"]
summary: Dict[str, Dict[str, int]] = defaultdict(default_value_trace_results_summary)
return_all = "return_all" in request.query and request.query["return_all"].lower() == "true"

if return_all:
for token, token_results in self._trace_check_results_by_check.items():
for check_name, check_results in token_results.items():
summary[check_name]["Passed_Checks"] += check_results["Passed_Checks"]
summary[check_name]["Failed_Checks"] += check_results["Failed_Checks"]
summary[check_name]["Skipped_Checks"] += check_results["Skipped_Checks"]
else:
summary = self._trace_check_results_by_check.get(token, {})
json_summary = json.dumps(summary)
return web.HTTPOk(body=json_summary, content_type="application/json")

async def apmtelemetry(self) -> List[TelemetryEvent]:
"""Return the telemetry events stored by the agent"""
_events: List[TelemetryEvent] = []
Expand Down Expand Up @@ -420,6 +479,8 @@ async def _handle_traces(self, request: Request, version: Literal["v0.4", "v0.5"
checks: Checks = request.app["checks"]
headers = request.headers

# TODO: This method requires all checks are hard coded

await checks.check("trace_stall", headers=headers, request=request)

with CheckTrace.add_frame("headers") as f:
Expand Down Expand Up @@ -602,6 +663,7 @@ async def handle_session_requests(self, request: Request) -> web.Response:
self.handle_v01_pipelinestats,
self.handle_v2_apmtelemetry,
self.handle_v1_profiling,
self.handle_v07_remoteconfig,
):
continue
resp.append(
Expand Down Expand Up @@ -753,26 +815,34 @@ async def check_failure_middleware(self, request: Request, handler: _Handler) ->
try:
response = await handler(request)
except AssertionError as e:
token = request.get("session_token", "default")
self._trace_check_results_by_check = trace.get_results(self._trace_check_results_by_check)
token = request["session_token"]

# update trace_check results
trace.update_results(self._trace_check_results_by_check[token])

# only save trace failures to memory if necessary
msg = str(trace) + str(e)
if request.app["pool_trace_check_failures"]:
log.info(f"Storing Trace Check Failure for Session Token: {token}.")
self._trace_failures[token].append(msg)
# append failure to trace failures
self._trace_failures[token].append((trace, msg))
log.error(msg)
return web.HTTPBadRequest(body=msg)
else:
token = request["session_token"]
self._trace_check_results_by_check = trace.get_results(self._trace_check_results_by_check)
# update trace_check results
trace.update_results(self._trace_check_results_by_check[token])
if trace.has_fails():
# only save trace failures to memory if necessary
pool_failures = request.app["pool_trace_check_failures"]
log.error(f"Trace had the following failures, using config: token={token}, DD_POOL_TRACE_CHECK_FAILURES={pool_failures}")
log.error(
f"Trace had the following failures, using config: token={token}, DD_POOL_TRACE_CHECK_FAILURES={pool_failures}"
)
msg = str(trace)
if request.app["pool_trace_check_failures"]:
log.info(f"Storing Trace Check Failure for Session Token: {token}.")
self._trace_failures[token].append(msg)
# append failure to trace failures
self._trace_failures[token].append((trace, msg))
log.error(msg)
if request.app["disable_error_responses"]:
return response
Expand All @@ -782,7 +852,7 @@ async def check_failure_middleware(self, request: Request, handler: _Handler) ->


def make_app(
disabled_checks: List[str],
enabled_checks: List[str],
log_span_fmt: str,
snapshot_dir: str,
snapshot_ci_mode: bool,
Expand Down Expand Up @@ -833,6 +903,7 @@ def make_app(
web.get("/test/trace/analyze", agent.handle_trace_analyze),
web.get("/test/trace_check/failures", agent.get_trace_check_failures),
web.get("/test/trace_check/clear", agent.clear_trace_check_failures),
web.get("/test/trace_check/summary", agent.get_trace_check_summary),
]
)
checks = Checks(
Expand All @@ -844,7 +915,7 @@ def make_app(
CheckTracePeerService,
CheckTraceDDService
],
disabled=disabled_checks,
enabled=enabled_checks,
)
app["checks"] = checks
app["snapshot_dir"] = snapshot_dir
Expand Down Expand Up @@ -908,11 +979,11 @@ def main(args: Optional[List[str]] = None) -> None:
),
)
parser.add_argument(
"--disabled-checks",
"--enabled-checks",
type=List[str],
default=_parse_csv(os.environ.get("DISABLED_CHECKS", "")),
default=_parse_csv(os.environ.get("ENABLED_CHECKS", "")),
help=(
"Comma-separated values of checks to disable. None are disabled "
"Comma-separated values of checks to enable. None are enabled "
" by default. For the list of values see "
"https://github.com/datadog/dd-trace-test-agent"
),
Expand Down Expand Up @@ -992,7 +1063,7 @@ def main(args: Optional[List[str]] = None) -> None:
os.path.abspath(parsed_args.snapshot_dir),
)
app = make_app(
disabled_checks=parsed_args.disabled_checks,
enabled_checks=parsed_args.enabled_checks,
log_span_fmt=parsed_args.log_span_fmt,
snapshot_dir=parsed_args.snapshot_dir,
snapshot_ci_mode=parsed_args.snapshot_ci_mode,
Expand Down
Loading

0 comments on commit 7c1fe91

Please sign in to comment.