Skip to content

Commit

Permalink
Files added required to test experimental aggregator based workflow b…
Browse files Browse the repository at this point in the history
…y GitHUB actions

Signed-off-by: Parth Mandaliya <parthx.mandaliya@intel.com>
  • Loading branch information
ParthM-GitHub committed Sep 28, 2023
1 parent 4545663 commit 2f4c26e
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/taskrunner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ jobs:
- name: Test TaskRunner API
run: |
python -m tests.github.test_hello_federation --template keras_cnn_mnist --fed_workspace aggregator --col1 col1 --col2 col2 --rounds-to-train 3 --save-model output_model
- name: Test Experimental Aggregator Based Workflow API
run: |
python -m tests.github.experimental.workspace.test_experimental_agg_based_workflow --template testcase_datastore_cli --fed_workspace aggregator --col col1 --col col2 --rounds-to-train 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import os
import time
import socket
import argparse
from pathlib import Path
from subprocess import check_call
from concurrent.futures import ProcessPoolExecutor
from sys import executable
import shutil
from openfl.utilities.utils import rmtree
from tests.github.experimental.workspace.utils import create_collaborator
from tests.github.experimental.workspace.utils import create_certified_workspace
from tests.github.experimental.workspace.utils import certify_aggregator


if __name__ == '__main__':
# Test the pipeline
parser = argparse.ArgumentParser()
workspace_choice = []
with os.scandir('tests/github/experimental/workspace') as iterator:
for entry in iterator:
if entry.name not in ['__init__.py', 'workspace', 'default']:
workspace_choice.append(entry.name)
parser.add_argument('--template', default='testcase_include_exclude', choices=workspace_choice)
parser.add_argument('--fed_workspace', default='fed_work12345alpha81671')
parser.add_argument('--col', action='append', default=[])
parser.add_argument('--rounds-to-train')

origin_dir = Path.cwd().resolve()
args = parser.parse_args()
fed_workspace = args.fed_workspace
archive_name = f'{fed_workspace}.zip'
fqdn = socket.getfqdn()
template = args.template
rounds_to_train = args.rounds_to_train
collaborators = args.col
# START
# =====
# Make sure you are in a Python virtual environment with the FL package installed.

source_directory = origin_dir / 'tests'/'github'/'experimental'/'workspace' / template
destination_directory = origin_dir / 'openfl-workspace' / 'experimental' / template
if os.path.exists(destination_directory):
shutil.rmtree(destination_directory)

# Copy template to the destination directory
shutil.copytree(src=source_directory, dst=destination_directory)

check_call([executable, '-m', 'pip', 'install', '.'])

# Activate experimental
check_call(['fx', 'experimental', 'activate'])

create_certified_workspace(fed_workspace, template, fqdn, rounds_to_train)
certify_aggregator(fqdn)

# Get the absolute directory path for the workspace
workspace_root = Path().resolve()

# Create Collaborators
for collab in collaborators:
create_collaborator(
collab, workspace_root, archive_name, fed_workspace
)

# Run the federation
with ProcessPoolExecutor(max_workers=len(collaborators) + 1) as executor:
executor.submit(
check_call, ['fx', 'aggregator', 'start'], cwd=workspace_root
)
time.sleep(5)

for collab in collaborators:
col_dir = workspace_root / collab / fed_workspace
executor.submit(
check_call, ['fx', 'collaborator', 'start', '-n', collab],
cwd=col_dir
)

os.chdir(origin_dir)
rmtree(workspace_root)

# Remove template to the destination directory
shutil.rmtree(destination_directory)

# Deactivate experimental
check_call(['fx', 'experimental', 'deactivate'])
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ def validate_datastore_cli(flow_obj, expected_flow_steps, num_rounds):
No issues found and below are the tests that ran successfully
1. Datastore steps and expected steps are matching
2. Task stdout and task stderr verified through metaflow cli is as expected
3. Number of tasks are aligned with number of rounds and number\
of collaborators {Bcolors.ENDC}""")
3. Number of tasks are aligned with number of rounds and number """
f"""of collaborators {Bcolors.ENDC}""")


def display_validate_errors(validate_flow_error):
Expand Down
142 changes: 142 additions & 0 deletions tests/github/experimental/workspace/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import shutil
from subprocess import check_call
import os
from pathlib import Path
import re
import tarfile


def create_collaborator(col, workspace_root, archive_name, fed_workspace):
# Copy workspace to collaborator directories (these can be on different machines)
col_path = workspace_root / col
shutil.rmtree(col_path, ignore_errors=True) # Remove any existing directory
col_path.mkdir() # Create a new directory for the collaborator

# Import the workspace to this collaborator
check_call(
['fx', 'workspace', 'import', '--archive', workspace_root / archive_name],
cwd=col_path
)

# Create collaborator certificate request
check_call(
['fx', 'collaborator', 'create', '-n', col, '--silent'],
cwd=col_path / fed_workspace
)
# Remove '--silent' if you run this manually
check_call(
['fx', 'collaborator', 'generate-cert-request', '-n', col, '--silent'],
cwd=col_path / fed_workspace
)

# Sign collaborator certificate
# Remove '--silent' if you run this manually
request_pkg = col_path / fed_workspace / f'col_{col}_to_agg_cert_request.zip'
check_call(
['fx', 'collaborator', 'certify', '--request-pkg', str(request_pkg), '--silent'],
cwd=workspace_root)

# Import the signed certificate from the aggregator
import_path = workspace_root / f'agg_to_col_{col}_signed_cert.zip'
check_call(
['fx', 'collaborator', 'certify', '--import', import_path],
cwd=col_path / fed_workspace
)


def create_certified_workspace(path, template, fqdn, rounds_to_train):
shutil.rmtree(path, ignore_errors=True)
check_call(['fx', 'workspace', 'create', '--prefix', path, '--template', template])
os.chdir(path)

# Initialize FL plan
check_call(['fx', 'plan', 'initialize', '-a', fqdn])
plan_path = Path('plan/plan.yaml')
try:
rounds_to_train = int(rounds_to_train)
with open(plan_path, "r", encoding='utf-8') as sources:
lines = sources.readlines()
with open(plan_path, "w", encoding='utf-8') as sources:
for line in lines:
sources.write(
re.sub(r'rounds_to_train.*', f'rounds_to_train: {rounds_to_train}', line)
)
except (ValueError, TypeError):
pass
# Create certificate authority for workspace
check_call(['fx', 'workspace', 'certify'])

# Export FL workspace
check_call(['fx', 'workspace', 'export'])


def certify_aggregator(fqdn):
# Create aggregator certificate
check_call(['fx', 'aggregator', 'generate-cert-request', '--fqdn', fqdn])

# Sign aggregator certificate
check_call(['fx', 'aggregator', 'certify', '--fqdn', fqdn, '--silent'])


def create_signed_cert_for_collaborator(col, data_path):
'''
We do certs exchage for all participants in a single workspace to speed up this test run.
Do not do this in real experiments in untrusted environments
'''
print(f'Certifying collaborator {col} with data path {data_path}...')
# Create collaborator certificate request
check_call([
'fx', 'collaborator', 'create', '-d', data_path, '-n', col, '--silent'
])
check_call([
'fx', 'collaborator', 'generate-cert-request', '-n', col, '--silent'
])
# Sign collaborator certificate
check_call([
'fx',
'collaborator',
'certify',
'--request-pkg',
f'col_{col}_to_agg_cert_request.zip',
'--silent'
])

# Pack the collaborators private key and the signed cert
# as well as it's data.yaml to a tarball
tarfiles = ['plan/data.yaml', f'agg_to_col_{col}_signed_cert.zip']
with os.scandir('cert/client') as iterator:
for entry in iterator:
if entry.name.endswith('key'):
tarfiles.append(entry.path)
with tarfile.open(f'cert_col_{col}.tar', 'w') as t:
for f in tarfiles:
t.add(f)
for f in tarfiles:
os.remove(f)
# Remove request archive
os.remove(f'col_{col}_to_agg_cert_request.zip')


def start_aggregator_container(workspace_image_name, aggregator_required_files):
check_call(
'docker run --rm '
'--network host '
f'-v {Path.cwd().resolve()}/{aggregator_required_files}:/certs.tar '
'-e \"CONTAINER_TYPE=aggregator\" '
f'{workspace_image_name} '
'bash /openfl/openfl-docker/start_actor_in_container.sh',
shell=True)


def start_collaborator_container(workspace_image_name, col_name):
check_call(
'docker run --rm '
'--network host '
f'-v {Path.cwd()}/cert_col_{col_name}.tar:/certs.tar '
'-e \"CONTAINER_TYPE=collaborator\" '
f'-e \"COL={col_name}\" '
f'{workspace_image_name} '
'bash /openfl/openfl-docker/start_actor_in_container.sh',
shell=True)

0 comments on commit 2f4c26e

Please sign in to comment.