From a9298e8741ab9c1ba3aeefc0dadc195994775993 Mon Sep 17 00:00:00 2001 From: Joseph Heenan Date: Wed, 9 Jun 2021 00:00:39 +0100 Subject: [PATCH] run-test-plan.py: Run in parallel where possible Use python's asyncio / await / async to run modules within one test plan in parallel if they don't have an alias, and to run plans in parallel where they use no / different aliases. The parallelism of tests within a plan should speed up the CIBA tests and OIDC test modules where we use dcr can hence can proceed without any alias. Unfortunately the CIBA tests had to have parallelism disabled because Authlete's CIBA simulated authentication device doesn't seem to cope with multiple parallel. Reduces the time to run oidcc-basic-certification-test-plan from 313 seconds to 63 seconds on my local machine - most of which comes from the test that sleeps for 30 seconds before reusing an auth code. The ability to run plans with different aliases in parallel means we can run some of the FAPI tests in parallel with other tests. There may be further potential to speed this up by tweaking the aliases and redirect urls we use in (say) half of the FAPI tests. There was some hassle with python modules/node, I tried switching away from alpine for the reasons given here, where we were seeing issues installing the aiohttp module: https://pythonspeed.com/articles/alpine-docker-python/ but ending up switching back because the version of node (10) in Debian buster is ancient and doesn't support some of the crypto Fillip's client needs. Luckily the aiohttp module is one of the ones that can relatively easily be made to work on alpine. We add a retrying http client, as it seems either the parallelisation or something about the asyncio http client ends up giving weird http errors - I think we're running into the same bug as here: https://github.com/aio-libs/aiohttp/issues/4581 Switching to https://www.python-httpx.org might be an option to avoid that, but that's still in beta. part of #783 --- .gitlab-ci.yml | 8 +- ...cal-provider-oidcc-conformance-config.json | 27 +- .gitlab-ci/run-tests.sh | 8 +- scripts/conformance.py | 206 +++++++++++---- scripts/run-test-plan.py | 244 +++++++++++------- test/Dockerfile | 5 +- 6 files changed, 331 insertions(+), 167 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 5fd0ede08e..17646c93e8 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -215,7 +215,7 @@ stop_normal: .deployment_test: &deployment_test stage: test interruptible: true - image: python:alpine + image: python:3.9-alpine only: refs: - branches @@ -516,8 +516,7 @@ local_test: function install_test_dependencies() { echo "Installing extra dependencies" - apk add -U openssh-client git - apk add --update nodejs nodejs-npm + apk add openssh-client git gcc musl-dev bash eval $(ssh-agent -s) echo "$SSH_PRIVATE_KEY" | ssh-add - cd .. @@ -527,10 +526,11 @@ local_test: function set_up_for_running_test_plan() { install_test_dependencies - pip install requests + pip install aiohttp aiofiles aiohttp-retry } function run_client_test_plan() { + apk add nodejs npm echo "Running automated tests against $CONFORMANCE_SERVER" ../conformance-suite/.gitlab-ci/run-tests.sh --client-tests-only } diff --git a/.gitlab-ci/local-provider-oidcc-conformance-config.json b/.gitlab-ci/local-provider-oidcc-conformance-config.json index 2d575974fc..33bb4f6c39 100644 --- a/.gitlab-ci/local-provider-oidcc-conformance-config.json +++ b/.gitlab-ci/local-provider-oidcc-conformance-config.json @@ -1,5 +1,4 @@ { - "alias": "local-oidcc", "description": "oidc-provider OIDC", "server": { "discoveryUrl": "https://oidcc-provider:3000/.well-known/openid-configuration" @@ -54,7 +53,7 @@ }, { "task": "Verify Complete", - "match": "*/test/a/local-oidcc/callback*", + "match": "*/test/*/callback*", "commands": [ [ "wait", @@ -88,14 +87,14 @@ [ "wait", "contains", - "/test/a/local-oidcc/post", + "/test/*/post", 10 ] ] }, { "task": "Verify Complete", - "match": "*/test/a/local-oidcc/post*" + "match": "*/test/*/post*" } ] } @@ -155,7 +154,7 @@ }, { "task": "Verify Complete", - "match": "*/test/a/local-oidcc/callback*", + "match": "*/test/*/callback*", "commands": [ [ "wait", @@ -223,7 +222,7 @@ }, { "task": "Verify Complete", - "match": "*/test/a/local-oidcc/callback*", + "match": "*/test/*/callback*", "commands": [ [ "wait", @@ -450,7 +449,7 @@ }, { "task": "Verify Complete", - "match": "*/test/a/local-oidcc/callback*", + "match": "*/test/*/callback*", "commands": [ [ "wait", @@ -529,7 +528,7 @@ }, { "task": "Verify Complete", - "match": "*/test/a/local-oidcc/callback*", + "match": "*/test/*/callback*", "commands": [ [ "wait", @@ -608,7 +607,7 @@ }, { "task": "Verify Complete", - "match": "*/test/a/local-oidcc/callback*", + "match": "*/test/*/callback*", "commands": [ [ "wait", @@ -687,7 +686,7 @@ }, { "task": "Verify Complete", - "match": "*/test/a/local-oidcc/callback*", + "match": "*/test/*/callback*", "commands": [ [ "wait", @@ -766,7 +765,7 @@ }, { "task": "Verify Complete", - "match": "*/test/a/local-oidcc/callback*", + "match": "*/test/*/callback*", "commands": [ [ "wait", @@ -845,7 +844,7 @@ }, { "task": "Verify Complete", - "match": "*/test/a/local-oidcc/callback*", + "match": "*/test/*/callback*", "commands": [ [ "wait", @@ -935,7 +934,7 @@ }, { "task": "Verify Complete", - "match": "*/test/a/local-oidcc/callback*", + "match": "*/test/*/callback*", "commands": [ [ "wait", @@ -1025,7 +1024,7 @@ }, { "task": "Verify Complete", - "match": "*/test/a/local-oidcc/callback*", + "match": "*/test/*/callback*", "commands": [ [ "wait", diff --git a/.gitlab-ci/run-tests.sh b/.gitlab-ci/run-tests.sh index 466767c86e..1f98299e6c 100755 --- a/.gitlab-ci/run-tests.sh +++ b/.gitlab-ci/run-tests.sh @@ -1,4 +1,4 @@ -#!/bin/sh +#!/bin/bash set -e cleanup() { @@ -32,8 +32,8 @@ EXPECTED_FAILURES_FILE="../conformance-suite/.gitlab-ci/expected-failures-server EXPECTED_SKIPS_FILE="../conformance-suite/.gitlab-ci/expected-skips-server.json|../conformance-suite/.gitlab-ci/expected-skips-ciba.json|../conformance-suite/.gitlab-ci/expected-skips-client.json" makeClientTest() { - . node-client-setup.sh - . node-core-client-setup.sh + . ./node-client-setup.sh + . ./node-core-client-setup.sh # client FAPI1-ADVANCED TESTS="${TESTS} fapi1-advanced-final-client-test-plan[client_auth_type=private_key_jwt][fapi_profile=plain_fapi][fapi_auth_request_method=by_value][fapi_response_mode=plain_response][fapi_jarm_type=oidc] automated-ob-client-test.json" @@ -280,6 +280,7 @@ elif [ "$#" -eq 1 ] && [ "$1" = "--client-tests-only" ]; then TESTS="${TESTS} --expected-skips-file ${EXPECTED_SKIPS_FILE}" TESTS="${TESTS} --show-untested-test-modules client" TESTS="${TESTS} --export-dir ../conformance-suite" + TESTS="${TESTS} --no-parallel" # there seemed to be a lot of "Server disconnected" failures trying to run these in parallel echo "Run client tests" makeClientTest elif [ "$#" -eq 1 ] && [ "$1" = "--server-tests-only" ]; then @@ -299,6 +300,7 @@ elif [ "$#" -eq 1 ] && [ "$1" = "--ciba-tests-only" ]; then TESTS="${TESTS} --expected-skips-file ${EXPECTED_SKIPS_FILE}" TESTS="${TESTS} --show-untested-test-modules ciba" TESTS="${TESTS} --export-dir ../conformance-suite" + TESTS="${TESTS} --no-parallel" # the authlete authentication device simulator doesn't seem to support parallel authorizations echo "Run ciba tests" makeCIBATest elif [ "$#" -eq 1 ] && [ "$1" = "--local-provider-tests" ]; then diff --git a/scripts/conformance.py b/scripts/conformance.py index fef703f391..6846482a20 100644 --- a/scripts/conformance.py +++ b/scripts/conformance.py @@ -11,112 +11,208 @@ import os import shutil import time - +import aiohttp +import asyncio +import aiofiles +import string +import re +from types import SimpleNamespace + +from aiohttp import ClientSession, TraceConfig +from aiohttp_retry import RetryClient, ExponentialRetry + +# Retrying on the ClientPayloadError appears to be needed because of a bug in aiohttp +# possibly https://github.com/aio-libs/aiohttp/issues/4581 +retry_options = ExponentialRetry(attempts=5, start_timeout=1, exceptions=[aiohttp.ClientPayloadError, aiohttp.ServerDisconnectedError]) +http_debug = False + +replchars = re.compile('([^' + re.escape(string.printable) + '])') +def nonprintable_to_hex_inner(match): + return r'\x{0:02x}'.format(ord(match.group())) + +def nonprintable_to_hex(s): + return replchars.sub(nonprintable_to_hex_inner, s) + +async def on_request_start( + session: ClientSession, + trace_config_ctx: SimpleNamespace, + params: aiohttp.TraceRequestStartParams, +) -> None: + current_attempt = trace_config_ctx.trace_request_ctx['current_attempt'] + trace_config_ctx.what = "{} {}".format(params.method, params.url) + print('{} {} attempt {} of {}'.format(params.method, params.url, current_attempt, retry_options.attempts)) + +async def on_request_end( + session: ClientSession, + trace_config_ctx: SimpleNamespace, + params: aiohttp.TraceRequestEndParams, +) -> None: + current_attempt = trace_config_ctx.trace_request_ctx['current_attempt'] + trace_config_ctx.what = "{} {}".format(params.method, params.url) + if params.response.status in [200,201]: + body = await params.response.text('ISO-8859-1') + print('{} {} {} {} headers {} body starts "{}"'.format(params.method, params.url, params.response.status, params.response.reason, params.response.headers, nonprintable_to_hex(body[:100]))) + else: + print('{} {} {} {} headers {} body {}'.format(params.method, params.url, params.response.status, params.response.reason, params.response.headers, await params.response.text())) + + +async def on_request_exception( + session: ClientSession, + trace_config_ctx: SimpleNamespace, + params: aiohttp.TraceRequestExceptionParams, +) -> None: + print('{} {} exception {}'.format(params.method, params.url, params.exception)) + +async def on_connection_reuseconn( + session: ClientSession, + trace_config_ctx: SimpleNamespace, + params: aiohttp.TraceConnectionReuseconnParams, +) -> None: + print('{} on_connection_reuseconn'.format(trace_config_ctx.what)) + +async def on_connection_queued_start( + session: ClientSession, + trace_config_ctx: SimpleNamespace, + params: aiohttp.TraceConnectionQueuedStartParams, +) -> None: + print('{} on_connection_queued_start'.format(trace_config_ctx.what)) + +async def on_connection_queued_end( + session: ClientSession, + trace_config_ctx: SimpleNamespace, + params: aiohttp.TraceConnectionQueuedEndParams, +) -> None: + print('{} on_connection_queued_end'.format(trace_config_ctx.what)) + +async def on_connection_create_start( + session: ClientSession, + trace_config_ctx: SimpleNamespace, + params: aiohttp.TraceConnectionCreateStartParams, +) -> None: + print('{} on_connection_create_start'.format(trace_config_ctx.what)) + +async def on_connection_create_end( + session: ClientSession, + trace_config_ctx: SimpleNamespace, + params: aiohttp.TraceConnectionCreateEndParams, +) -> None: + print('{} on_connection_create_end'.format(trace_config_ctx.what)) class Conformance(object): - def __init__(self, api_url_base, api_token, requests_session): + def __init__(self, api_url_base, api_token, verify_ssl): if not api_url_base.endswith('/'): api_url_base += "/" self.api_url_base = api_url_base - self.requests_session = requests_session + headers = {'Content-Type': 'application/json'} if api_token is not None: headers['Authorization'] = 'Bearer {0}'.format(api_token) - self.requests_session.headers = headers - - def get_all_test_modules(self): + conn = aiohttp.TCPConnector(verify_ssl=verify_ssl) + trace_config = TraceConfig() + if http_debug: + trace_config.on_request_start.append(on_request_start) + trace_config.on_request_end.append(on_request_end) + trace_config.on_request_exception.append(on_request_exception) + trace_config.on_connection_reuseconn.append(on_connection_reuseconn) + trace_config.on_connection_queued_start.append(on_connection_queued_start) + trace_config.on_connection_queued_end.append(on_connection_queued_end) + trace_config.on_connection_create_start.append(on_connection_create_start) + trace_config.on_connection_create_end.append(on_connection_create_end) + self.requests_session = RetryClient(raise_for_status=False, retry_options=retry_options, headers=headers, connector=conn, trace_configs=[trace_config]) + + async def get_all_test_modules(self): """ Returns an array containing a dictionary per test module """ api_url = '{0}api/runner/available'.format(self.api_url_base) - response = self.requests_session.get(api_url) - - if response.status_code != 200: - raise Exception("get_all_test_modules failed - HTTP {:d} {}".format(response.status_code, response.content)) - return json.loads(response.content.decode('utf-8')) + async with self.requests_session.get(api_url) as response: + if response.status != 200: + raise Exception("get_all_test_modules failed - HTTP {:d} {}".format(response.status, await response.text())) + return await response.json() - def exporthtml(self, plan_id, path): + async def exporthtml(self, plan_id, path): api_url = '{0}api/plan/exporthtml/{1}'.format(self.api_url_base, plan_id) - with self.requests_session.get(api_url, stream=True) as response: - if response.status_code != 200: - raise Exception("exporthtml failed - HTTP {:d} {}".format(response.status_code, response.content)) + async with self.requests_session.get(api_url) as response: + if response.status != 200: + raise Exception("exporthtml failed - HTTP {:d} {}".format(response.status, await response.text())) d = response.headers['content-disposition'] local_filename = re.findall("filename=\"(.+)\"", d)[0] full_path = os.path.join(path, local_filename) - with open(full_path, 'wb') as f: - shutil.copyfileobj(response.raw, f) + f = await aiofiles.open(full_path, mode='wb') + await f.write(await response.read()) + await f.close() return full_path - def create_test_plan(self, name, configuration, variant=None): + async def create_test_plan(self, name, configuration, variant=None): api_url = '{0}api/plan'.format(self.api_url_base) payload = {'planName': name} if variant != None: payload['variant'] = json.dumps(variant) - response = self.requests_session.post(api_url, params=payload, data=configuration) + response = await self.requests_session.post(api_url, params=payload, data=configuration) - if response.status_code != 201: - raise Exception("create_test_plan failed - HTTP {:d} {}".format(response.status_code, response.content)) - return json.loads(response.content.decode('utf-8')) + if response.status != 201: + raise Exception("create_test_plan failed - HTTP {:d} {}".format(response.status, await response.text())) + return await response.json() - def create_test(self, test_name, configuration): + async def create_test(self, test_name, configuration): api_url = '{0}api/runner'.format(self.api_url_base) payload = {'test': test_name} - response = self.requests_session.post(api_url, params=payload, data=configuration) + response = await self.requests_session.post(api_url, params=payload, data=configuration) - if response.status_code != 201: - raise Exception("create_test failed - HTTP {:d} {}".format(response.status_code, response.content)) - return json.loads(response.content.decode('utf-8')) + if response.status != 201: + raise Exception("create_test failed - HTTP {:d} {}".format(response.status, await response.text())) + return await response.json() - def create_test_from_plan(self, plan_id, test_name): + async def create_test_from_plan(self, plan_id, test_name): api_url = '{0}api/runner'.format(self.api_url_base) payload = {'test': test_name, 'plan': plan_id} - response = self.requests_session.post(api_url, params=payload) + response = await self.requests_session.post(api_url, params=payload) - if response.status_code != 201: - raise Exception("create_test_from_plan failed - HTTP {:d} {}".format(response.status_code, response.content)) - return json.loads(response.content.decode('utf-8')) + if response.status != 201: + raise Exception("create_test_from_plan failed - HTTP {:d} {}".format(response.status, await response.text())) + return await response.json() - def create_test_from_plan_with_variant(self, plan_id, test_name, variant): + async def create_test_from_plan_with_variant(self, plan_id, test_name, variant): api_url = '{0}api/runner'.format(self.api_url_base) payload = {'test': test_name, 'plan': plan_id} if variant != None: payload['variant'] = json.dumps(variant) - response = self.requests_session.post(api_url, params=payload) + response = await self.requests_session.post(api_url, params=payload) - if response.status_code != 201: - raise Exception("create_test_from_plan failed - HTTP {:d} {}".format(response.status_code, response.content)) - return json.loads(response.content.decode('utf-8')) + if response.status != 201: + raise Exception("create_test_from_plan failed - HTTP {:d} {}".format(response.status, await response.text())) + return await response.json() - def get_module_info(self, module_id): + async def get_module_info(self, module_id): api_url = '{0}api/info/{1}'.format(self.api_url_base, module_id) - response = self.requests_session.get(api_url) + response = await self.requests_session.get(api_url) - if response.status_code != 200: - raise Exception("get_module_info failed - HTTP {:d} {}".format(response.status_code, response.content)) - return json.loads(response.content.decode('utf-8')) + if response.status != 200: + raise Exception("get_module_info failed - HTTP {:d} {}".format(response.status, await response.text())) + return await response.json() - def get_test_log(self, module_id): + async def get_test_log(self, module_id): api_url = '{0}api/log/{1}'.format(self.api_url_base, module_id) - response = self.requests_session.get(api_url) + response = await self.requests_session.get(api_url) - if response.status_code != 200: - raise Exception("get_test_log failed - HTTP {:d} {}".format(response.status_code, response.content)) - return json.loads(response.content.decode('utf-8')) + if response.status != 200: + raise Exception("get_test_log failed - HTTP {:d} {}".format(response.status, await response.text())) + return await response.json() - def start_test(self, module_id): + async def start_test(self, module_id): api_url = '{0}api/runner/{1}'.format(self.api_url_base, module_id) - response = self.requests_session.post(api_url) + response = await self.requests_session.post(api_url) - if response.status_code != 200: - raise Exception("start_test failed - HTTP {:d} {}".format(response.status_code, response.content)) - return json.loads(response.content.decode('utf-8')) + if response.status != 200: + raise Exception("start_test failed - HTTP {:d} {}".format(response.status, await response.text())) + return await response.json() - def wait_for_state(self, module_id, required_states, timeout=240): + async def wait_for_state(self, module_id, required_states, timeout=240): timeout_at = time.time() + timeout while True: if time.time() > timeout_at: raise Exception("Timed out waiting for test module {} to be in one of states: {}". format(module_id, required_states)) - info = self.get_module_info(module_id) + info = await self.get_module_info(module_id) status = info['status'] print("module id {} status is {}".format(module_id, status)) @@ -125,4 +221,4 @@ def wait_for_state(self, module_id, required_states, timeout=240): if status == 'INTERRUPTED': raise Exception("Test module {} has moved to INTERRUPTED".format(module_id)) - time.sleep(1) + await asyncio.sleep(1) diff --git a/scripts/run-test-plan.py b/scripts/run-test-plan.py index 673bfc35c5..bdbf745458 100755 --- a/scripts/run-test-plan.py +++ b/scripts/run-test-plan.py @@ -11,8 +11,8 @@ import time import subprocess import fnmatch +import asyncio -import requests import json import argparse import traceback @@ -63,7 +63,7 @@ def split_name_and_variant(test_plan): #Run OIDCC RP tests #OIDCC RP tests use a configuration file instead of providing all options in run-tests.sh #this function runs plans in a loop and returns an array of results -def run_test_plan_oidcc_rp(test_plan_name, config_file, json_config, oidcc_rptest_configfile, output_dir): +async def run_test_plan_oidcc_rp(test_plan_name, config_file, json_config, oidcc_rptest_configfile, output_dir): oidcc_test_config_json = [] with open(oidcc_rptest_configfile) as f: oidcc_test_config = f.read() @@ -83,7 +83,7 @@ def run_test_plan_oidcc_rp(test_plan_name, config_file, json_config, oidcc_rptes pass #remove client_metadata_defaults otherwise plan api call will fail del test_plan_config['client_metadata_defaults'] - test_plan_info = conformance.create_test_plan(test_plan_name, json_config, test_plan_config) + test_plan_info = await conformance.create_test_plan(test_plan_name, json_config, test_plan_config) variantstr = json.dumps(test_plan_config) print('VARIANT {}'.format(variantstr)) @@ -106,14 +106,14 @@ def run_test_plan_oidcc_rp(test_plan_name, config_file, json_config, oidcc_rptes try: print('Running test module: {}'.format(module)) - test_module_info = conformance.create_test_from_plan(plan_id, module) + test_module_info = await conformance.create_test_from_plan(plan_id, module) module_id = test_module_info['id'] module_info['id'] = module_id test_info[module] = module_info print('Created test module, new id: {}'.format(module_id)) print('{}log-detail.html?log={}'.format(api_url_base, module_id)) - state = conformance.wait_for_state(module_id, ["WAITING", "FINISHED"]) + state = await conformance.wait_for_state(module_id, ["WAITING", "FINISHED"]) if state == "WAITING": oidcc_issuer_str = os.environ["CONFORMANCE_SERVER"] + os.environ["OIDCC_TEST_CONFIG_ALIAS"] @@ -130,22 +130,22 @@ def run_test_plan_oidcc_rp(test_plan_name, config_file, json_config, oidcc_rptes os.putenv('ISSUER', oidcc_issuer_str) subprocess.call(["npm", "run", "client"], cwd="./sample-openid-client-nodejs") - conformance.wait_for_state(module_id, ["FINISHED"]) + await conformance.wait_for_state(module_id, ["FINISHED"]) except Exception as e: traceback.print_exc() - print('Exception: Test {} failed to run to completion: {}'.format(module, e)) + print('Exception: Test {} {} failed to run to completion: {}'.format(module, module_id, e)) if module_id != '': test_time_taken[module_id] = time.time() - test_start_time - module_info['info'] = conformance.get_module_info(module_id) - module_info['logs'] = conformance.get_test_log(module_id) + module_info['info'] = await conformance.get_module_info(module_id) + module_info['logs'] = await conformance.get_test_log(module_id) time_for_plan = time.time() - start_time_for_plan print('Finished test plan - id: {} total time: {}'.format(plan_id, time_for_plan)) if output_dir != None: start_time_for_save = time.time() - filename = conformance.exporthtml(plan_id, output_dir) - print('results saved to "{}" in {:.1f} seconds'.format(filename, time.time() - start_time_for_save)) + filename = await conformance.exporthtml(plan_id, output_dir) + print('{} results saved to "{}" in {:.1f} seconds'.format(plan_id, filename, time.time() - start_time_for_save)) print('\n\n') result_for_plan = { 'test_plan': test_plan_name, @@ -174,8 +174,28 @@ def get_string_name_for_module_with_variant(moduledict): name += "[{}={}]".format(v, variants[v]) return name - -def run_test_plan(test_plan, config_file, output_dir): +async def queue_worker(q): + while True: + code = await q.get() + try: + await code + except Exception as e: + # log and ignore all exceptions, as run_queue otherwise locks up + print('Exception caught in queue_worker: {}'.format(e)) + finally: + q.task_done() + +async def run_queue(q, parallel_jobs): + workers = [asyncio.create_task(queue_worker(q)) for _ in range(parallel_jobs)] + await q.join() # wait for all tasks to be processed + print("queue done, cancelling workers") + for worker in workers: + worker.cancel() + print("workers cancelled, gathering") + await asyncio.gather(*workers, return_exceptions=True) + print("workers gathered") + +async def run_test_plan(test_plan, config_file, output_dir): print("Running plan '{}' with configuration file '{}'".format(test_plan, config_file)) start_section(test_plan, "Results", True) with open(config_file) as f: @@ -183,9 +203,17 @@ def run_test_plan(test_plan, config_file, output_dir): (test_plan_name, variant) = split_name_and_variant(test_plan) if test_plan_name.startswith('oidcc-client-'): #for oidcc client tests 'variant' will contain the rp tests configuration file name - return run_test_plan_oidcc_rp(test_plan_name, config_file, json_config, variant, output_dir) - test_plan_info = conformance.create_test_plan(test_plan_name, json_config, variant) + return await run_test_plan_oidcc_rp(test_plan_name, config_file, json_config, variant, output_dir) + test_plan_info = await conformance.create_test_plan(test_plan_name, json_config, variant) plan_id = test_plan_info['id'] + parsed_config = json.loads(json_config) + parallel_jobs = 3 + if args.no_parallel: + parallel_jobs = 1 + print("{}: no_parallel command line argument passed - not running tests in parallel".format(plan_id)) + elif "alias" in parsed_config: + parallel_jobs = 1 + print("{}: Config '{}' contains an alias - not running tests in parallel. If the test supports dynamic client registration and you have enanbled it, you can remove the alias to from your configuration file to speed up tests.".format(plan_id, config_file)) plan_modules = test_plan_info['modules'] test_info = {} # key is module name test_time_taken = {} # key is module_id @@ -193,69 +221,14 @@ def run_test_plan(test_plan, config_file, output_dir): print('Created test plan, new id: {}'.format(plan_id)) print('{}plan-detail.html?plan={}'.format(api_url_base, plan_id)) print('{:d} modules to test:\n{}\n'.format(len(plan_modules), '\n'.join(mod['testModule'] for mod in plan_modules))) + queue = asyncio.Queue() for moduledict in plan_modules: - module=moduledict['testModule'] - module_with_variants = get_string_name_for_module_with_variant(moduledict) - test_start_time = time.time() - module_id = '' - module_info = {} - - try: - print('Running test module: {}'.format(module_with_variants)) - test_module_info = conformance.create_test_from_plan_with_variant(plan_id, module, moduledict.get('variant')) - module_id = test_module_info['id'] - module_info['id'] = module_id - test_info[get_string_name_for_module_with_variant(moduledict)] = module_info - print('Created test module, new id: {}'.format(module_id)) - print('{}log-detail.html?log={}'.format(api_url_base, module_id)) - - state = conformance.wait_for_state(module_id, ["CONFIGURED", "WAITING", "FINISHED"]) - if state == "CONFIGURED": - if module == 'oidcc-server-rotate-keys': - # This test needs manually started once the OP keys have been rotated; we can't actually do that - # but at least we can run the test and check it finishes even if it always fails. - print('Starting test') - conformance.start_test(module_id) - state = conformance.wait_for_state(module_id, ["WAITING", "FINISHED"]) - - if state == "WAITING": - # If it's a client test, we need to run the client. - # please note oidcc client tests are handled in a separate method. only FAPI ones will reach here - if re.match(r'fapi-rw-id2-client-.*', module) or \ - re.match(r'fapi1-advanced-final-client-.*', module): - print("FAPI client test: " + module + " " + json.dumps(variant)) - profile = variant['fapi_profile'] - os.putenv('CLIENTTESTMODE', 'fapi-ob' if re.match(r'openbanking', profile) else 'fapi-rw') - os.environ['ISSUER'] = os.environ["CONFORMANCE_SERVER"] + os.environ["TEST_CONFIG_ALIAS"] - if 'fapi_auth_request_method' in variant.keys() and variant['fapi_auth_request_method']: - os.environ['FAPI_AUTH_REQUEST_METHOD'] = variant['fapi_auth_request_method'] - else: - os.environ['FAPI_AUTH_REQUEST_METHOD'] = 'by_value' - if 'fapi_response_mode' in variant.keys() and variant['fapi_response_mode']: - os.environ['FAPI_RESPONSE_MODE'] = variant['fapi_response_mode'] - else: - os.environ['FAPI_RESPONSE_MODE'] = 'plain_response' - if 'fapi_jarm_type' in variant.keys() and variant['fapi_jarm_type']: - os.environ['FAPI_JARM_TYPE'] = variant['fapi_jarm_type'] - else: - os.environ['FAPI_JARM_TYPE'] = 'oidc' - - os.environ['TEST_MODULE_NAME'] = module - subprocess.call(["npm", "run", "client"], cwd="./sample-openbanking-client-nodejs") - - conformance.wait_for_state(module_id, ["FINISHED"]) - - except Exception as e: - traceback.print_exc() - print('Exception: Test {} failed to run to completion: {}'.format(module_with_variants, e)) - if module_id != '': - test_time_taken[module_id] = time.time() - test_start_time - module_info['info'] = conformance.get_module_info(module_id) - module_info['logs'] = conformance.get_test_log(module_id) + queue.put_nowait(run_test_module(moduledict, plan_id, test_info, test_time_taken, variant)) + await run_queue(queue, parallel_jobs) overall_time = time.time() - overall_start_time if output_dir != None: start_time_for_save = time.time() - filename = conformance.exporthtml(plan_id, output_dir) + filename = await conformance.exporthtml(plan_id, output_dir) print('results saved to "{}" in {:.1f} seconds'.format(filename, time.time() - start_time_for_save)) end_section(test_plan) print('\n\n') @@ -270,6 +243,67 @@ def run_test_plan(test_plan, config_file, output_dir): } +async def run_test_module(moduledict, plan_id, test_info, test_time_taken, variant): + module = moduledict['testModule'] + module_with_variants = get_string_name_for_module_with_variant(moduledict) + test_start_time = time.time() + module_id = '' + module_info = {} + try: + print('Running test module: {}'.format(module_with_variants)) + test_module_info = await conformance.create_test_from_plan_with_variant(plan_id, module, + moduledict.get('variant')) + module_id = test_module_info['id'] + module_info['id'] = module_id + test_info[get_string_name_for_module_with_variant(moduledict)] = module_info + print('Created test module, new id: {}'.format(module_id)) + print('{}log-detail.html?log={}'.format(api_url_base, module_id)) + + state = await conformance.wait_for_state(module_id, ["CONFIGURED", "WAITING", "FINISHED"]) + if state == "CONFIGURED": + if module == 'oidcc-server-rotate-keys': + # This test needs manually started once the OP keys have been rotated; we can't actually do that + # but at least we can run the test and check it finishes even if it always fails. + print('Starting test') + await conformance.start_test(module_id) + state = await conformance.wait_for_state(module_id, ["WAITING", "FINISHED"]) + + if state == "WAITING": + # If it's a client test, we need to run the client. + # please note oidcc client tests are handled in a separate method. only FAPI ones will reach here + if re.match(r'fapi-rw-id2-client-.*', module) or \ + re.match(r'fapi1-advanced-final-client-.*', module): + print("FAPI client test: " + module + " " + json.dumps(variant)) + profile = variant['fapi_profile'] + os.putenv('CLIENTTESTMODE', 'fapi-ob' if re.match(r'openbanking', profile) else 'fapi-rw') + os.environ['ISSUER'] = os.environ["CONFORMANCE_SERVER"] + os.environ["TEST_CONFIG_ALIAS"] + if 'fapi_auth_request_method' in variant.keys() and variant['fapi_auth_request_method']: + os.environ['FAPI_AUTH_REQUEST_METHOD'] = variant['fapi_auth_request_method'] + else: + os.environ['FAPI_AUTH_REQUEST_METHOD'] = 'by_value' + if 'fapi_response_mode' in variant.keys() and variant['fapi_response_mode']: + os.environ['FAPI_RESPONSE_MODE'] = variant['fapi_response_mode'] + else: + os.environ['FAPI_RESPONSE_MODE'] = 'plain_response' + if 'fapi_jarm_type' in variant.keys() and variant['fapi_jarm_type']: + os.environ['FAPI_JARM_TYPE'] = variant['fapi_jarm_type'] + else: + os.environ['FAPI_JARM_TYPE'] = 'oidc' + + os.environ['TEST_MODULE_NAME'] = module + subprocess.call(["npm", "run", "client"], cwd="./sample-openbanking-client-nodejs") + + await conformance.wait_for_state(module_id, ["FINISHED"]) + + except Exception as e: + traceback.print_exc() + print('Exception: Test {} {} failed to run to completion: {}'.format(module_with_variants, module_id, e)) + if module_id != '': + test_time_taken[module_id] = time.time() - test_start_time + module_info['info'] = await conformance.get_module_info(module_id) + module_info['logs'] = await conformance.get_test_log(module_id) + + # from http://stackoverflow.com/a/26445590/3191896 and https://gist.github.com/Jossef/0ee20314577925b4027f def color(text, **user_styles): @@ -869,6 +903,7 @@ def parser_args_cli(): parser = argparse.ArgumentParser(description='Parser arguments list which is supplied by the user') parser.add_argument('--export-dir', help='Directory to save exported results into', default=None) + parser.add_argument('--no-parallel', help='Disable parallel running of tests', action='store_true') parser.add_argument('--show-untested-test-modules', help='Flag to require show or do not show test modules which were untested', default='') parser.add_argument('--expected-failures-file', help='Json configuration file name which records a list of expected failures/warnings', default='') parser.add_argument('--expected-skips-file', help='Json configuration file name which records a list of expected skipped tests', default='') @@ -899,8 +934,19 @@ def end_section(name): sys.stdout.flush() sys.stderr.flush() -if __name__ == '__main__': - requests_session = requests.Session() +async def run_test_plan_wrapper(plan_name, config_json, export_dir): + result = await run_test_plan(plan_name, config_json, export_dir) + if isinstance(result, list): + results.extend(result) + else: + results.append(result) + + +async def main(): + global conformance + global api_url_base + global untested_test_modules + global args dev_mode = 'CONFORMANCE_DEV_MODE' in os.environ @@ -925,13 +971,6 @@ def end_section(name): else: token = os.environ['CONFORMANCE_TOKEN'] - if dev_mode or 'DISABLE_SSL_VERIFY' in os.environ: - # disable https certificate validation - requests_session.verify = False - import urllib3 - - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - args = parser_args_cli() show_untested = args.show_untested_test_modules params = args.params @@ -945,11 +984,12 @@ def end_section(name): to_run.append((params[0], params[1])) params = params[2:] - conformance = Conformance(api_url_base, token, requests_session) + verify_ssl = not dev_mode and not 'DISABLE_SSL_VERIFY' in os.environ + conformance = Conformance(api_url_base, token, verify_ssl) for attempt in range(1, 12): try: - all_test_modules_array = conformance.get_all_test_modules() + all_test_modules_array = await conformance.get_all_test_modules() break except Exception as exc: # the server may not have finished starting yet; sleep & try again @@ -970,13 +1010,36 @@ def end_section(name): if args.expected_skips_file: expected_skips_list = load_expected_problems(args.expected_skips_file) + global results results = [] + queues = {} # key is alias for test plan, or None + workers = [] for (plan_name, config_json) in to_run: - result = run_test_plan(plan_name, config_json, args.export_dir) - if isinstance(result, list): - results.extend(result) + with open(config_json) as f: + json_config = f.read() + parsed_config = json.loads(json_config) + if args.no_parallel: + alias = None else: - results.append(result) + alias = parsed_config["alias"] if "alias" in parsed_config else None + if alias not in queues: + queues[alias] = asyncio.Queue() + parallel_jobs = 2 if alias == None and not args.no_parallel else 1 + print("Creating queue for "+str(alias)+" parallel="+str(parallel_jobs)) + workers.extend([asyncio.create_task(queue_worker(queues[alias])) for _ in range(parallel_jobs)]) + queues[alias].put_nowait(run_test_plan_wrapper(plan_name, config_json, args.export_dir)) + + for q in queues: + print("plan: joining "+str(q)) + await queues[q].join() + print("plan: queues done, cancelling workers") + for worker in workers: + worker.cancel() + print("plan: workers cancelled, gathering") + await asyncio.gather(*workers, return_exceptions=True) + print("plan: workers gathered") + + await conformance.requests_session.close() print("\n\nScript complete - results:") @@ -1116,3 +1179,6 @@ def is_unused(obj): print(success("All tests ran to completion. See above for any test condition failures.")) sys.exit(0) + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/test/Dockerfile b/test/Dockerfile index 9713b22d36..55f10e18f3 100644 --- a/test/Dockerfile +++ b/test/Dockerfile @@ -1,2 +1,3 @@ -FROM python:alpine -RUN pip install requests +FROM python:3.9-alpine +RUN apk add gcc musl-dev bash +RUN pip install aiohttp aiofiles aiohttp-retry