Skip to content

Commit

Permalink
Stop test execution if api response does not conform to schema (fixes #…
Browse files Browse the repository at this point in the history
  • Loading branch information
otto-ifak committed Jan 13, 2025
1 parent 0860ab0 commit cafabe8
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 36 deletions.
12 changes: 9 additions & 3 deletions aas_test_engines/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ def from_json(self, data: dict) -> "AasTestResult":

class ContextManager:

def __init__(self, result: AasTestResult):
def __init__(self, result: AasTestResult, catch_all_exceptions: bool):
self.result = result
self.catch_all_exceptions = catch_all_exceptions

def __enter__(self) -> AasTestResult:
managers.append(self)
Expand All @@ -128,6 +129,11 @@ def __exit__(self, exc_type, exc_val, traceback):
if managers:
managers[-1].result.append(self.result)
return True
elif self.catch_all_exceptions:
self.result.append(AasTestResult(f"Internal error: {exc_val}", Level.CRITICAL))
if managers:
managers[-1].result.append(self.result)
return True
else:
return False

Expand All @@ -151,9 +157,9 @@ def write(message: Union[str, AasTestResult]):
managers[-1].result.append(message)


def start(message: Union[str, AasTestResult]) -> ContextManager:
def start(message: Union[str, AasTestResult], catch_all_exceptions: bool = False) -> ContextManager:
result = _as_result(message, Level.INFO)
return ContextManager(result)
return ContextManager(result, catch_all_exceptions)


def abort(message: Union[str, AasTestResult]):
Expand Down
66 changes: 33 additions & 33 deletions aas_test_engines/test_cases/v3_0/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,39 +508,39 @@ def _invoke(request: Request, conf: ExecConf, positive_test: bool) -> requests.m


def _invoke_and_decode(request: Request, conf: ExecConf, positive_test: bool) -> dict:
with start(f"Invoke: {request.operation.method.upper()} {request.make_path()}"):
response = _execute(request, conf, positive_test)
expected_responses = []
expected_responses += [i for i in request.operation.responses if i.code == response.status_code]
expected_responses += [i for i in request.operation.responses if i.code is None]
if not expected_responses:
abort(f"Invalid status code {response.status_code}")
data = _get_json(response)
ref = expected_responses[0].schema.get('$ref')
if ref == '#/components/schemas/AssetAdministrationShell':
result, aas = parse_and_check_json(AssetAdministrationShell, data)
write(result)
return data
if ref == '#/components/schemas/Environment':
result, aas = parse_and_check_json(Environment, data)
write(result)
return data
if ref == '#/components/schemas/GetAssetAdministrationShellsResult':
entries = data.get('result', None)
if entries:
result, aas = parse_and_check_json(List[AssetAdministrationShell], entries)
write(result)
# Fallthrough to check against schema for paging_metadata

validator = parse_schema({**expected_responses[0].schema, '$schema': 'https://json-schema.org/draft/2020-12/schema'}, ParseConfig(raise_on_unknown_format=False))
validation_result = validator.validate(data)
if validation_result.ok:
write("Response conforms to schema")
else:
result = AasTestResult(f"Invalid response for schema", level=Level.ERROR)
_map_error(result, validation_result)
raise ResultException(result)
write(f"Invoke: {request.operation.method.upper()} {request.make_path()}")
response = _execute(request, conf, positive_test)
expected_responses = []
expected_responses += [i for i in request.operation.responses if i.code == response.status_code]
expected_responses += [i for i in request.operation.responses if i.code is None]
if not expected_responses:
abort(f"Invalid status code {response.status_code}")
data = _get_json(response)
ref = expected_responses[0].schema.get('$ref')
if ref == '#/components/schemas/AssetAdministrationShell':
result, aas = parse_and_check_json(AssetAdministrationShell, data)
write(result)
return data
if ref == '#/components/schemas/Environment':
result, aas = parse_and_check_json(Environment, data)
write(result)
return data
if ref == '#/components/schemas/GetAssetAdministrationShellsResult':
entries = data.get('result', None)
if entries:
result, aas = parse_and_check_json(List[AssetAdministrationShell], entries)
write(result)
# Fallthrough to check against schema for paging_metadata

validator = parse_schema({**expected_responses[0].schema, '$schema': 'https://json-schema.org/draft/2020-12/schema'}, ParseConfig(raise_on_unknown_format=False))
validation_result = validator.validate(data)
if validation_result.ok:
write("Response conforms to schema")
else:
result = AasTestResult(f"Invalid response for schema", level=Level.ERROR)
_map_error(result, validation_result)
raise ResultException(result)
return data


class ApiTestSuite:
Expand Down Expand Up @@ -1940,7 +1940,7 @@ def execute_tests(conf: ExecConf, suite: str) -> Tuple[AasTestResult, ConfusionM
for operation in _spec.open_api.operations.values():
if operation.operation_id not in operation_ids:
continue
with start(f"Checking {operation.method.upper()} {operation.path} ({operation.operation_id})"):
with start(f"Checking {operation.method.upper()} {operation.path} ({operation.operation_id})", True):
if conf.dry:
continue

Expand Down

0 comments on commit cafabe8

Please sign in to comment.