From 2f4c26e6bd8a8f61bc7c8ba361ee831df5207d99 Mon Sep 17 00:00:00 2001 From: Parth Mandaliya Date: Fri, 11 Aug 2023 14:57:50 +0530 Subject: [PATCH] Files added required to test experimental aggregator based workflow by GitHUB actions Signed-off-by: Parth Mandaliya --- .github/workflows/taskrunner.yml | 3 + .../test_experimental_agg_based_workflow.py | 90 +++++++++++ .../src/testflow_datastore_cli.py | 4 +- tests/github/experimental/workspace/utils.py | 142 ++++++++++++++++++ 4 files changed, 237 insertions(+), 2 deletions(-) create mode 100644 tests/github/experimental/workspace/test_experimental_agg_based_workflow.py create mode 100644 tests/github/experimental/workspace/utils.py diff --git a/.github/workflows/taskrunner.yml b/.github/workflows/taskrunner.yml index 9d8d79d47bf..c84ed689fdb 100644 --- a/.github/workflows/taskrunner.yml +++ b/.github/workflows/taskrunner.yml @@ -31,3 +31,6 @@ jobs: - name: Test TaskRunner API run: | python -m tests.github.test_hello_federation --template keras_cnn_mnist --fed_workspace aggregator --col1 col1 --col2 col2 --rounds-to-train 3 --save-model output_model + - name: Test Experimental Aggregator Based Workflow API + run: | + python -m tests.github.experimental.workspace.test_experimental_agg_based_workflow --template testcase_datastore_cli --fed_workspace aggregator --col col1 --col col2 --rounds-to-train 1 diff --git a/tests/github/experimental/workspace/test_experimental_agg_based_workflow.py b/tests/github/experimental/workspace/test_experimental_agg_based_workflow.py new file mode 100644 index 00000000000..830ee33c348 --- /dev/null +++ b/tests/github/experimental/workspace/test_experimental_agg_based_workflow.py @@ -0,0 +1,90 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os +import time +import socket +import argparse +from pathlib import Path +from subprocess import check_call +from concurrent.futures import ProcessPoolExecutor +from sys import executable +import shutil +from openfl.utilities.utils import rmtree +from tests.github.experimental.workspace.utils import create_collaborator +from tests.github.experimental.workspace.utils import create_certified_workspace +from tests.github.experimental.workspace.utils import certify_aggregator + + +if __name__ == '__main__': + # Test the pipeline + parser = argparse.ArgumentParser() + workspace_choice = [] + with os.scandir('tests/github/experimental/workspace') as iterator: + for entry in iterator: + if entry.name not in ['__init__.py', 'workspace', 'default']: + workspace_choice.append(entry.name) + parser.add_argument('--template', default='testcase_include_exclude', choices=workspace_choice) + parser.add_argument('--fed_workspace', default='fed_work12345alpha81671') + parser.add_argument('--col', action='append', default=[]) + parser.add_argument('--rounds-to-train') + + origin_dir = Path.cwd().resolve() + args = parser.parse_args() + fed_workspace = args.fed_workspace + archive_name = f'{fed_workspace}.zip' + fqdn = socket.getfqdn() + template = args.template + rounds_to_train = args.rounds_to_train + collaborators = args.col + # START + # ===== + # Make sure you are in a Python virtual environment with the FL package installed. + + source_directory = origin_dir / 'tests'/'github'/'experimental'/'workspace' / template + destination_directory = origin_dir / 'openfl-workspace' / 'experimental' / template + if os.path.exists(destination_directory): + shutil.rmtree(destination_directory) + + # Copy template to the destination directory + shutil.copytree(src=source_directory, dst=destination_directory) + + check_call([executable, '-m', 'pip', 'install', '.']) + + # Activate experimental + check_call(['fx', 'experimental', 'activate']) + + create_certified_workspace(fed_workspace, template, fqdn, rounds_to_train) + certify_aggregator(fqdn) + + # Get the absolute directory path for the workspace + workspace_root = Path().resolve() + + # Create Collaborators + for collab in collaborators: + create_collaborator( + collab, workspace_root, archive_name, fed_workspace + ) + + # Run the federation + with ProcessPoolExecutor(max_workers=len(collaborators) + 1) as executor: + executor.submit( + check_call, ['fx', 'aggregator', 'start'], cwd=workspace_root + ) + time.sleep(5) + + for collab in collaborators: + col_dir = workspace_root / collab / fed_workspace + executor.submit( + check_call, ['fx', 'collaborator', 'start', '-n', collab], + cwd=col_dir + ) + + os.chdir(origin_dir) + rmtree(workspace_root) + + # Remove template to the destination directory + shutil.rmtree(destination_directory) + + # Deactivate experimental + check_call(['fx', 'experimental', 'deactivate']) diff --git a/tests/github/experimental/workspace/testcase_datastore_cli/src/testflow_datastore_cli.py b/tests/github/experimental/workspace/testcase_datastore_cli/src/testflow_datastore_cli.py index 4ca2695c5c4..4517ef0e875 100644 --- a/tests/github/experimental/workspace/testcase_datastore_cli/src/testflow_datastore_cli.py +++ b/tests/github/experimental/workspace/testcase_datastore_cli/src/testflow_datastore_cli.py @@ -255,8 +255,8 @@ def validate_datastore_cli(flow_obj, expected_flow_steps, num_rounds): No issues found and below are the tests that ran successfully 1. Datastore steps and expected steps are matching 2. Task stdout and task stderr verified through metaflow cli is as expected - 3. Number of tasks are aligned with number of rounds and number\ - of collaborators {Bcolors.ENDC}""") + 3. Number of tasks are aligned with number of rounds and number """ + f"""of collaborators {Bcolors.ENDC}""") def display_validate_errors(validate_flow_error): diff --git a/tests/github/experimental/workspace/utils.py b/tests/github/experimental/workspace/utils.py new file mode 100644 index 00000000000..3863b006bf2 --- /dev/null +++ b/tests/github/experimental/workspace/utils.py @@ -0,0 +1,142 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +import shutil +from subprocess import check_call +import os +from pathlib import Path +import re +import tarfile + + +def create_collaborator(col, workspace_root, archive_name, fed_workspace): + # Copy workspace to collaborator directories (these can be on different machines) + col_path = workspace_root / col + shutil.rmtree(col_path, ignore_errors=True) # Remove any existing directory + col_path.mkdir() # Create a new directory for the collaborator + + # Import the workspace to this collaborator + check_call( + ['fx', 'workspace', 'import', '--archive', workspace_root / archive_name], + cwd=col_path + ) + + # Create collaborator certificate request + check_call( + ['fx', 'collaborator', 'create', '-n', col, '--silent'], + cwd=col_path / fed_workspace + ) + # Remove '--silent' if you run this manually + check_call( + ['fx', 'collaborator', 'generate-cert-request', '-n', col, '--silent'], + cwd=col_path / fed_workspace + ) + + # Sign collaborator certificate + # Remove '--silent' if you run this manually + request_pkg = col_path / fed_workspace / f'col_{col}_to_agg_cert_request.zip' + check_call( + ['fx', 'collaborator', 'certify', '--request-pkg', str(request_pkg), '--silent'], + cwd=workspace_root) + + # Import the signed certificate from the aggregator + import_path = workspace_root / f'agg_to_col_{col}_signed_cert.zip' + check_call( + ['fx', 'collaborator', 'certify', '--import', import_path], + cwd=col_path / fed_workspace + ) + + +def create_certified_workspace(path, template, fqdn, rounds_to_train): + shutil.rmtree(path, ignore_errors=True) + check_call(['fx', 'workspace', 'create', '--prefix', path, '--template', template]) + os.chdir(path) + + # Initialize FL plan + check_call(['fx', 'plan', 'initialize', '-a', fqdn]) + plan_path = Path('plan/plan.yaml') + try: + rounds_to_train = int(rounds_to_train) + with open(plan_path, "r", encoding='utf-8') as sources: + lines = sources.readlines() + with open(plan_path, "w", encoding='utf-8') as sources: + for line in lines: + sources.write( + re.sub(r'rounds_to_train.*', f'rounds_to_train: {rounds_to_train}', line) + ) + except (ValueError, TypeError): + pass + # Create certificate authority for workspace + check_call(['fx', 'workspace', 'certify']) + + # Export FL workspace + check_call(['fx', 'workspace', 'export']) + + +def certify_aggregator(fqdn): + # Create aggregator certificate + check_call(['fx', 'aggregator', 'generate-cert-request', '--fqdn', fqdn]) + + # Sign aggregator certificate + check_call(['fx', 'aggregator', 'certify', '--fqdn', fqdn, '--silent']) + + +def create_signed_cert_for_collaborator(col, data_path): + ''' + We do certs exchage for all participants in a single workspace to speed up this test run. + Do not do this in real experiments in untrusted environments + ''' + print(f'Certifying collaborator {col} with data path {data_path}...') + # Create collaborator certificate request + check_call([ + 'fx', 'collaborator', 'create', '-d', data_path, '-n', col, '--silent' + ]) + check_call([ + 'fx', 'collaborator', 'generate-cert-request', '-n', col, '--silent' + ]) + # Sign collaborator certificate + check_call([ + 'fx', + 'collaborator', + 'certify', + '--request-pkg', + f'col_{col}_to_agg_cert_request.zip', + '--silent' + ]) + + # Pack the collaborators private key and the signed cert + # as well as it's data.yaml to a tarball + tarfiles = ['plan/data.yaml', f'agg_to_col_{col}_signed_cert.zip'] + with os.scandir('cert/client') as iterator: + for entry in iterator: + if entry.name.endswith('key'): + tarfiles.append(entry.path) + with tarfile.open(f'cert_col_{col}.tar', 'w') as t: + for f in tarfiles: + t.add(f) + for f in tarfiles: + os.remove(f) + # Remove request archive + os.remove(f'col_{col}_to_agg_cert_request.zip') + + +def start_aggregator_container(workspace_image_name, aggregator_required_files): + check_call( + 'docker run --rm ' + '--network host ' + f'-v {Path.cwd().resolve()}/{aggregator_required_files}:/certs.tar ' + '-e \"CONTAINER_TYPE=aggregator\" ' + f'{workspace_image_name} ' + 'bash /openfl/openfl-docker/start_actor_in_container.sh', + shell=True) + + +def start_collaborator_container(workspace_image_name, col_name): + check_call( + 'docker run --rm ' + '--network host ' + f'-v {Path.cwd()}/cert_col_{col_name}.tar:/certs.tar ' + '-e \"CONTAINER_TYPE=collaborator\" ' + f'-e \"COL={col_name}\" ' + f'{workspace_image_name} ' + 'bash /openfl/openfl-docker/start_actor_in_container.sh', + shell=True)