diff --git a/docs/mermaid/director_aggregator.mmd b/docs/mermaid/director_aggregator.mmd new file mode 100644 index 0000000000..69cf6f11df --- /dev/null +++ b/docs/mermaid/director_aggregator.mmd @@ -0,0 +1,22 @@ +sequenceDiagram + participant D as Director + participant A as Aggregator + rect rgb(0, 255, 0,.1) + Note over D: An Experiment's start + D->>D: Get next experiment
from Experiment Registry + opt Docker specific logic + D->>D: Create aggregator docker build
context from experiment workspace.
(Add Dockerfile and execution script
to context specific for aggregator) + D->>D: Build aggregator docker image + D->>D: Create aggregator docker container + D->>D: Run aggregator docker container + D->>D: Monitor aggregator docker container + end + loop every round + A->>D: Send last/best model to director + D->>D: Save model on director + end + opt Docker specific logic + D->>D: Delete aggregator docker container
when experiment was finished + end + end + Note over D: The Experiment ended.
The Federation keeps existing. \ No newline at end of file diff --git a/docs/mermaid/director_envoy.mmd b/docs/mermaid/director_envoy.mmd new file mode 100644 index 0000000000..00bda8ac0e --- /dev/null +++ b/docs/mermaid/director_envoy.mmd @@ -0,0 +1,55 @@ +sequenceDiagram + participant N as NoteBook + participant D as Director + participant E as Envoy + rect rgb(0, 255, 0,.1) + Note over D,E: A Federation startup process + D->D: Starts + E->E: Adapting a dataset + E->E: Starts + Note over D,E: Exchange certs + E-->>D: Connects using FQDN and pwd + E-->>D: Communicates dataset info to + D-->D: Keeps a list of connected Envoys + D-->D: Ensures unified data interface + end + Note over D,E: We consider a Federation set up + rect rgb(0, 255, 0,.1) + Note over N,D: Create new experiment + N->>N: Prepare experiment in Notebook + N->>N: Connect to federation + N->>N: Run experiment + N->>D: Send Experiment workspace + D->>D: Create new experiment + D->>D: Add experiment to regestry + end + rect rgb(0, 255, 0,.1) + Note over D,E: An Experiment's start + D->>D: Get next experiment
from Experiment Registry + opt Docker specific logic + D->>D: Create aggregator docker build
context from experiment workspace.
(Add Dockerfile and execution script
to context specific for aggregator) + D->>D: Build aggregator docker image + D->>D: Create aggregator docker container + D->>D: Run aggregator docker container + D->>D: Monitor aggregator docker container + end + E->>D: WaitExperiment + D-->>E: Send Experiment name + E->>D: GetExperimentData(experiment_name) + D-->>E: Send Experiment workspace + opt Docker specific logic + E->>E: Create collaborator docker build
context from Experiment workspace.
(Add Dockerfile and execution script
to context specific for collaborator) + E->>E: Build collaborator docker image + E->>E: Create collaborator docker container + E->>E: Run collaborator docker container + E->>E: Monitor collaborator docker container + end + Note over D,E: Wait for last round finished + opt Docker specific logic + E->>E: Delete collaborator docker container
when experiment was finished + D->>D: Delete aggregator docker container
when experiment was finished + end + end + N->>D: Get best model + D-->>N: Send best model + Note over D,E: The Experiment ended.
The Federation keeps existing. \ No newline at end of file diff --git a/openfl-docker/Dockerfile.aggregator b/openfl-docker/Dockerfile.aggregator new file mode 100644 index 0000000000..ac466b2865 --- /dev/null +++ b/openfl-docker/Dockerfile.aggregator @@ -0,0 +1,7 @@ +FROM python:3.8 + +RUN pip install --upgrade pip +RUN pip install git+https://github.com/dmitryagapov/openfl.git@feature/run_collaborator_in_docker + +COPY . /code +WORKDIR /code \ No newline at end of file diff --git a/openfl-docker/Dockerfile.collaborator b/openfl-docker/Dockerfile.collaborator new file mode 100644 index 0000000000..8c152da1c9 --- /dev/null +++ b/openfl-docker/Dockerfile.collaborator @@ -0,0 +1,10 @@ +FROM python:3.8 + +RUN pip install --upgrade pip +RUN pip install git+https://github.com/dmitryagapov/openfl.git@feature/run_collaborator_in_docker + +WORKDIR /code +COPY ./requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . diff --git a/openfl-tutorials/interactive_api/MXNet_landmarks/director/start_director_docker.sh b/openfl-tutorials/interactive_api/MXNet_landmarks/director/start_director_docker.sh new file mode 100755 index 0000000000..265548d3f8 --- /dev/null +++ b/openfl-tutorials/interactive_api/MXNet_landmarks/director/start_director_docker.sh @@ -0,0 +1,4 @@ +#!/bin/bash +set -e + +fx director start --disable-tls -c director_config.yaml --use-docker diff --git a/openfl-tutorials/interactive_api/MXNet_landmarks/envoy/start_envoy_docker.sh b/openfl-tutorials/interactive_api/MXNet_landmarks/envoy/start_envoy_docker.sh new file mode 100755 index 0000000000..72a15413ed --- /dev/null +++ b/openfl-tutorials/interactive_api/MXNet_landmarks/envoy/start_envoy_docker.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -e +ENVOY_NAME=$1 +SHARD_CONF=$2 + +fx envoy start -n "$ENVOY_NAME" --disable-tls --envoy-config-path "$SHARD_CONF" -dh localhost -dp 50051 diff --git a/openfl-tutorials/interactive_api/PyTorch_DogsCats_ViT/envoy/envoy_config_one.yaml b/openfl-tutorials/interactive_api/PyTorch_DogsCats_ViT/envoy/envoy_config_one.yaml index 0e997c2edd..65d416eb3f 100644 --- a/openfl-tutorials/interactive_api/PyTorch_DogsCats_ViT/envoy/envoy_config_one.yaml +++ b/openfl-tutorials/interactive_api/PyTorch_DogsCats_ViT/envoy/envoy_config_one.yaml @@ -1,13 +1,16 @@ params: - cuda_devices: [0] - + cuda_devices: [ 0 ] + optional_plugin_components: - cuda_device_monitor: - template: openfl.plugins.processing_units_monitor.pynvml_monitor.PynvmlCUDADeviceMonitor - settings: [] + cuda_device_monitor: + template: openfl.plugins.processing_units_monitor.pynvml_monitor.PynvmlCUDADeviceMonitor + settings: [ ] shard_descriptor: template: dogs_cats_shard_descriptor.DogsCatsShardDescriptor + volumes: + - '~/.kaggle/kaggle.json' + - './data' params: data_folder: data rank_worldsize: 1,2 diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/director_config.yaml b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/director_config.yaml index 860e0436f5..86849ed0ca 100644 --- a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/director_config.yaml +++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/director_config.yaml @@ -1,6 +1,15 @@ settings: listen_host: localhost listen_port: 50050 - sample_shape: ['300', '400', '3'] - target_shape: ['300', '400'] + sample_shape: [ '300', '400', '3' ] + target_shape: [ '300', '400' ] envoy_health_check_period: 5 # in seconds + docker: + env: + http_proxy: + https_proxy: + no_proxy: + buildargs: + HTTP_PROXY: + HTTPS_PROXY: + NO_PROXY: diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/start_director_docker.sh b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/start_director_docker.sh new file mode 100755 index 0000000000..ce839b9765 --- /dev/null +++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/start_director_docker.sh @@ -0,0 +1,4 @@ +#!/bin/bash +set -e + +fx director start --disable-tls -c director_config.yaml --use-docker \ No newline at end of file diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config.yaml b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config.yaml index aae095f4e7..09fca7774f 100644 --- a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config.yaml +++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config.yaml @@ -1,10 +1,21 @@ params: - cuda_devices: [0,2] - + cuda_devices: [ 0, 2 ] + docker: + env: + http_proxy: + https_rpoxy: + no_proxy: + buildargs: + HTTP_PROXY: + HTTPS_PROXY: + NO_PROXY: + volumes: + - './kvasir_data' + optional_plugin_components: - cuda_device_monitor: - template: openfl.plugins.processing_units_monitor.pynvml_monitor.PynvmlCUDADeviceMonitor - settings: [] + cuda_device_monitor: + template: openfl.plugins.processing_units_monitor.pynvml_monitor.PynvmlCUDADeviceMonitor + settings: [ ] shard_descriptor: template: kvasir_shard_descriptor.KvasirShardDescriptor @@ -12,3 +23,5 @@ shard_descriptor: data_folder: kvasir_data rank_worldsize: 1,10 enforce_image_hw: '300,400' + + diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config_no_gpu.yaml b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config_no_gpu.yaml index 1c121e534a..ed99b0de46 100644 --- a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config_no_gpu.yaml +++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config_no_gpu.yaml @@ -1,7 +1,11 @@ params: - cuda_devices: [] + cuda_devices: [ ] + docker_env: + http_proxy: + https_rpoxy: + no_proxy: -optional_plugin_components: {} +optional_plugin_components: { } shard_descriptor: template: kvasir_shard_descriptor.KvasirShardDescriptor diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/start_envoy_docker.sh b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/start_envoy_docker.sh new file mode 100755 index 0000000000..e8249f1b24 --- /dev/null +++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/start_envoy_docker.sh @@ -0,0 +1,4 @@ +#!/bin/bash +set -e + +fx envoy start -n env_one --disable-tls --envoy-config-path envoy_config.yaml -dh localhost -dp 50050 --use-docker diff --git a/openfl-tutorials/interactive_api/numpy_linear_regression/envoy/linreg_shard_descriptor.py b/openfl-tutorials/interactive_api/numpy_linear_regression/envoy/linreg_shard_descriptor.py index 12bfed9df4..6c47172618 100644 --- a/openfl-tutorials/interactive_api/numpy_linear_regression/envoy/linreg_shard_descriptor.py +++ b/openfl-tutorials/interactive_api/numpy_linear_regression/envoy/linreg_shard_descriptor.py @@ -33,7 +33,7 @@ def get_dataset(self, dataset_type: str) -> np.ndarray: """ Return a shard dataset by type. - A simple list with elements (x, y) implemets the Shard Dataset interface. + A simple list with elements (x, y) implements the Shard Dataset interface. """ if dataset_type == 'train': return self.data[:self.n_samples // 2] diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index dd1fc591ea..c95515c659 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -4,7 +4,9 @@ """Aggregator module.""" import queue from logging import getLogger +from pathlib import Path from typing import List +from typing import Union from openfl.component.aggregation_functions import WeightedAverage from openfl.databases import TensorDB @@ -12,6 +14,7 @@ from openfl.pipelines import TensorCodec from openfl.protocols import base_pb2 from openfl.protocols import utils +from openfl.transport.grpc.director_client import DirectorClient from openfl.utilities import TaskResultKey from openfl.utilities import TensorKey from openfl.utilities.logs import write_metric @@ -33,24 +36,33 @@ class Aggregator: \* - plan setting. """ - def __init__(self, - - aggregator_uuid, - federation_uuid, - authorized_cols, - - init_state_path, - best_state_path, - last_state_path, - - assigner, - - rounds_to_train=256, - single_col_cert_common_name=None, - compression_pipeline=None, - db_store_rounds=1, - write_logs=False, - **kwargs): + def __init__( + self, + + aggregator_uuid, + federation_uuid, + authorized_cols, + + init_state_path, + best_state_path, + last_state_path, + + assigner, + + director_host, + director_port, + experiment_name: str = None, + rounds_to_train=256, + single_col_cert_common_name=None, + compression_pipeline=None, + db_store_rounds=1, + write_logs=False, + tls: bool = False, + root_certificate: Union[Path, str] = None, + private_key: Union[Path, str] = None, + certificate: Union[Path, str] = None, + **kwargs + ) -> None: """Initialize.""" self.round_number = 0 self.single_col_cert_common_name = single_col_cert_common_name @@ -110,6 +122,17 @@ def __init__(self, self.collaborator_tasks_results = {} # {TaskResultKey: list of TensorKeys} self.collaborator_task_weight = {} # {TaskResultKey: data_size} + self.experiment_name = experiment_name + + self.director_client = DirectorClient( + client_id=f'aggregator_{experiment_name}', + director_host=director_host, + director_port=director_port, + tls=tls, + root_certificate=root_certificate, + private_key=private_key, + certificate=certificate, + ) def _load_initial_tensors(self): """ @@ -517,7 +540,8 @@ def send_local_task_results(self, collaborator_name, round_number, task_name, 'task_name': task_name, 'metric_name': tensor_key.tensor_name, 'metric_value': metric_value, - 'round': round_number} + 'round': round_number + } if self.write_logs: self.log_metric(tensor_key.tags[-1], task_name, tensor_key.tensor_name, nparray, round_number) @@ -607,10 +631,14 @@ def _process_named_tensor(self, named_tensor, collaborator_name): The numpy array associated with the returned tensorkey """ raw_bytes = named_tensor.data_bytes - metadata = [{'int_to_float': proto.int_to_float, - 'int_list': proto.int_list, - 'bool_list': proto.bool_list} - for proto in named_tensor.transformer_metadata] + metadata = [ + { + 'int_to_float': proto.int_to_float, + 'int_list': proto.int_list, + 'bool_list': proto.bool_list, + } + for proto in named_tensor.transformer_metadata + ] # The tensor has already been transfered to aggregator, # so the newly constructed tensor should have the aggregator origin tensor_key = TensorKey( @@ -793,7 +821,7 @@ def _prepare_trained(self, tensor_name, origin, round_number, report, agg_result # Finally, cache the updated model tensor self.tensor_db.cache_tensor({final_model_tk: new_model_nparray}) - def _compute_validation_related_task_metrics(self, task_name): + def _compute_validation_related_task_metrics(self, task_name) -> bool: """ Compute all validation related metrics. @@ -801,6 +829,7 @@ def _compute_validation_related_task_metrics(self, task_name): task_name : str The task name to compute """ + is_best_model = False # By default, print out all of the metrics that the validation # task sent # This handles getting the subset of collaborators that may be @@ -845,7 +874,8 @@ def _compute_validation_related_task_metrics(self, task_name): 'task_name': task_name, 'metric_name': tensor_key.tensor_name, 'metric_value': agg_results.item(), - 'round': round_number} + 'round': round_number, + } if agg_results is None: self.logger.warning( @@ -872,8 +902,25 @@ def _compute_validation_related_task_metrics(self, task_name): f'model with score {agg_results:f}') self.best_model_score = agg_results self._save_model(round_number, self.best_state_path) + is_best_model = True if 'trained' in tags: self._prepare_trained(tensor_name, origin, round_number, report, agg_results) + return is_best_model + + def upload_model_to_aggregator(self, model_type: str = 'last'): + if model_type not in ['last', 'best']: + raise ValueError( + f'Invalid {model_type=} for upload_model_to_aggregator function. ' + f'Allowed values "last", "best"' + ) + model_proto = utils.construct_model_proto( + self.last_tensor_dict, 0, NoCompressionPipeline() + ) + self.director_client.upload_experiment_model( + experiment_name=self.experiment_name, + model_proto=model_proto, + model_type=model_type + ) def _end_of_round_check(self): """ @@ -894,8 +941,11 @@ def _end_of_round_check(self): # Compute all validation related metrics all_tasks = self.assigner.get_all_tasks_for_round(self.round_number) + is_best_model = False for task_name in all_tasks: - self._compute_validation_related_task_metrics(task_name) + _is_best_model = self._compute_validation_related_task_metrics(task_name) + if _is_best_model: + is_best_model = True # Once all of the task results have been processed self.round_number += 1 @@ -903,6 +953,8 @@ def _end_of_round_check(self): # Save the latest model self.logger.info(f'Saving round {self.round_number} model...') self._save_model(self.round_number, self.last_state_path) + model_type = 'best' if is_best_model else 'last' + self.upload_model_to_aggregator(model_type=model_type) # TODO This needs to be fixed! if self._time_to_quit(): @@ -943,7 +995,9 @@ def _log_big_warning(self): def stop(self, failed_collaborator: str = None) -> None: """Stop aggregator execution.""" self.logger.info('Force stopping the aggregator execution.') - for collaborator_name in filter(lambda c: c != failed_collaborator, self.authorized_cols): + + for collaborator_name in filter(lambda c: c != failed_collaborator, + self.authorized_cols): self.logger.info(f'Sending signal to collaborator {collaborator_name} to shutdown...') self.quit_job_sent_to.append(collaborator_name) diff --git a/openfl/component/director/director.py b/openfl/component/director/director.py index 8cd0689531..fadaacf328 100644 --- a/openfl/component/director/director.py +++ b/openfl/component/director/director.py @@ -1,17 +1,24 @@ -# Copyright (C) 2020-2021 Intel Corporation +# Copyright (C) 2020-2022 Intel Corporation # SPDX-License-Identifier: Apache-2.0 """Director module.""" import asyncio import logging +import pickle +import shutil import time +import uuid from collections import defaultdict from pathlib import Path +from tarfile import TarFile from typing import Iterable +from typing import List from typing import Union from typing import ValuesView +from openfl.docker.docker import DockerConfig +from openfl.federated import Plan from openfl.protocols import base_pb2 from openfl.transport import AsyncAggregatorGRPCClient from .experiment import Experiment @@ -28,13 +35,16 @@ class Director: def __init__( self, *, + director_host, + director_port, tls: bool = True, root_certificate: Union[Path, str] = None, private_key: Union[Path, str] = None, certificate: Union[Path, str] = None, sample_shape: list = None, target_shape: list = None, - settings: dict = None + settings: dict = None, + docker_config: DockerConfig, ) -> None: """Initialize a director object.""" self.sample_shape, self.target_shape = sample_shape, target_shape @@ -47,6 +57,9 @@ def __init__( self.settings = settings or {} self.col_exp_queues = defaultdict(asyncio.Queue) self.col_exp = {} + self.director_host = director_host + self.director_port = director_port + self.docker_config = docker_config def acknowledge_shard(self, shard_info: dict) -> bool: """Save shard info to shard registry if it's acceptable.""" @@ -76,13 +89,34 @@ async def set_new_experiment( experiment_archive_path: Path, ) -> bool: """Set new experiment.""" + tensor_dict_path = Path('tmp') / f'{uuid.uuid4()}' / f'{experiment_name}.pickle' + tensor_dict_path = tensor_dict_path.absolute() + tensor_dict_path.parent.mkdir(parents=True, exist_ok=True) + + with tensor_dict_path.open('wb') as f: + pickle.dump(tensor_dict, f) + plan = self._parse_plan(experiment_archive_path) + aggregator_client = AsyncAggregatorGRPCClient( + agg_addr=plan.agg_addr, + agg_port=plan.agg_port, + tls=self.tls, + disable_client_auth=not self.tls, + root_certificate=self.root_certificate, + certificate=self.certificate, + private_key=self.private_key + ) experiment = Experiment( name=experiment_name, archive_path=experiment_archive_path, collaborators=list(collaborator_names), users=[sender_name], sender=sender_name, - init_tensor_dict=tensor_dict, + init_tensor_dict_path=tensor_dict_path, + docker_config=self.docker_config, + director_host=self.director_host, + director_port=self.director_port, + plan=plan, + aggregator_client=aggregator_client, ) self.experiments_registry.add(experiment) return True @@ -90,8 +124,6 @@ async def set_new_experiment( async def get_aggregator_client(self, experiment_name): """Return an aggregator client for the experiment.""" exp = self.experiments_registry[experiment_name] - while exp.status != Status.IN_PROGRESS: - await asyncio.sleep(1) agg_port = exp.plan.agg_port agg_addr = exp.plan.agg_addr logger.info(f'Aggregator uri: {agg_addr}:{agg_port}') @@ -114,16 +146,16 @@ async def get_trained_model(self, experiment_name: str, caller: str, model_type: logger.error('No experiment data in the stash') return None exp = self.experiments_registry[experiment_name] - if exp.status != Status.IN_PROGRESS: - return None - aggregator_client = await self.get_aggregator_client(experiment_name) - trained_model = await aggregator_client.get_trained_model( - experiment_name, - model_type - ) - - return trained_model + if model_type == 'last': + return exp.last_tensor_dict + elif model_type == 'best': + return exp.best_tensor_dict + else: + raise ValueError( + f'Invalid value {model_type=} in function get_trained_model. ' + f'Allowed values: "last", "best"' + ) def get_experiment_data(self, experiment_name: str) -> Path: """Get experiment data.""" @@ -165,15 +197,14 @@ async def stream_metrics(self, experiment_name: str, caller: str): Raises: StopIteration - if the experiment is finished and there is no more metrics to report """ - if (experiment_name not in self.experiments_registry - or caller not in self.experiments_registry[experiment_name].users): + exp = self.experiments_registry.get(experiment_name) + if exp is None or caller not in exp.users: raise Exception( f'No experiment name "{experiment_name}" in experiments list, or caller "{caller}"' f' does not have access to this experiment' ) - aggregator_client = await self.get_aggregator_client(experiment_name) - async for metric_dict in aggregator_client.get_metric_stream(): + async for metric_dict in await exp.get_metric_stream(): yield metric_dict def remove_experiment_data(self, experiment_name: str, caller: str): @@ -182,11 +213,11 @@ def remove_experiment_data(self, experiment_name: str, caller: str): and caller in self.experiments_registry[experiment_name].users): self.experiments_registry.remove(experiment_name) - def set_experiment_failed(self, *, experiment_name: str, collaborator_name: str): + async def set_experiment_failed(self, *, experiment_name: str, collaborator_name: str): """Set experiment failed.""" - if experiment_name in self.experiments_registry: - aggregator = self.experiments_registry[experiment_name].aggregator - aggregator.stop(failed_collaborator=collaborator_name) + exp = self.experiments_registry.get(experiment_name) + if exp is not None: + await exp.stop(collaborator_name) def update_envoy_status( self, *, @@ -215,36 +246,25 @@ def get_envoys(self) -> ValuesView: """Get a status information about envoys.""" logger.info(f'Shard registry: {self._shard_registry}') for envoy_info in self._shard_registry.values(): - envoy_info['is_online'] = ( - time.time() < envoy_info.get('last_updated', 0) - + envoy_info.get('valid_duration', 0) - ) + last_updated = envoy_info.get('last_updated', 0) + valid_duration = envoy_info.get('valid_duration', 0) + envoy_info['is_online'] = time.time() < last_updated + valid_duration envoy_name = envoy_info['shard_info']['node_info']['name'] envoy_info['experiment_name'] = self.col_exp.get(envoy_name) return self._shard_registry.values() - async def get_experiments_list(self, caller: str) -> list: + async def get_experiments_list(self, caller: str) -> List[dict]: """Get experiments list for specific user.""" experiments = self.experiments_registry.get_user_experiments(caller) result = [] for exp in experiments: - exp_data = { - 'name': exp.name, - 'status': exp.status, - 'collaborators_amount': len(exp.collaborators), - } - if exp.status == Status.IN_PROGRESS: - aggregator_client = await self.get_aggregator_client(exp.name) - experiment_pb2 = await aggregator_client.get_experiment_description() - exp_data['progress'] = experiment_pb2.progress - exp_data['tasks_amount'] = len(experiment_pb2.tasks) - result.append(exp_data) - + description = await exp.get_description() + result.append(description) return result async def get_experiment_description(self, caller: str, experiment_name: str) -> dict: - """Get a experiment information by name for specific user.""" + """Get an experiment information by name for specific user.""" exp = self.experiments_registry.get(experiment_name) if not exp or caller not in exp.users: logger.info(f'Experiment {experiment_name} for user {caller} does not exist.') @@ -254,12 +274,8 @@ async def get_experiment_description(self, caller: str, experiment_name: str) -> name=exp.name, status=exp.status, ) - aggregator_client = await self.get_aggregator_client(experiment_name) - experiment_pb2 = await aggregator_client.get_experiment_description() - experiment_pb2.name = experiment_name - experiment_pb2.status = exp.status - - return experiment_pb2 + description = await exp.get_description() + return description async def start_experiment_execution_loop(self): """Run task to monitor and run experiments.""" @@ -276,3 +292,36 @@ async def start_experiment_execution_loop(self): queue = self.col_exp_queues[col_name] await queue.put(experiment.name) await run_aggregator_future + + async def upload_experiment_model( + self, + experiment_name: str, + tensor_dict: dict, + model_type: str, + ) -> None: + if model_type not in ['last', 'best']: + raise ValueError( + f'Invalid {model_type=} in upload_experiment_model function. ' + f'Allowed values "last", "best"' + ) + exp = self.experiments_registry[experiment_name] + if model_type == 'last': + exp.last_tensor_dict = tensor_dict + if model_type == 'best': + exp.best_tensor_dict = tensor_dict + + @staticmethod + def _parse_plan(archive_path): + plan_path = Path('plan/plan.yaml') + with TarFile(name=archive_path, mode='r') as tar_file: + plan_buffer = tar_file.extractfile(f'./{plan_path}') + if plan_buffer is None: + raise Exception(f'No {plan_path} in workspace.') + plan_data = plan_buffer.read() + tmp_plan_path = Path('tmp') / f'{uuid.uuid4()}' / 'plan.yaml' + tmp_plan_path.parent.mkdir(parents=True, exist_ok=True) + with tmp_plan_path.open('wb') as plan_f: + plan_f.write(plan_data) + plan = Plan.parse(plan_config_path=tmp_plan_path) + shutil.rmtree(tmp_plan_path.parent, ignore_errors=True) + return plan diff --git a/openfl/component/director/experiment.py b/openfl/component/director/experiment.py index c3c6537a50..63b1969339 100644 --- a/openfl/component/director/experiment.py +++ b/openfl/component/director/experiment.py @@ -1,4 +1,4 @@ -# Copyright (C) 2020-2021 Intel Corporation +# Copyright (C) 2021-2022 Intel Corporation # SPDX-License-Identifier: Apache-2.0 """Experiment module.""" @@ -7,12 +7,19 @@ import logging from contextlib import asynccontextmanager from pathlib import Path +from tarfile import TarFile from typing import Iterable from typing import List from typing import Union +import numpy as np +from google.protobuf.json_format import MessageToDict + +from openfl.docker import docker +from openfl.docker.docker import DockerConfig from openfl.federated import Plan from openfl.transport import AggregatorGRPCServer +from openfl.transport import AsyncAggregatorGRPCClient from openfl.utilities.workspace import ExperimentWorkspace logger = logging.getLogger(__name__) @@ -25,6 +32,8 @@ class Status: FINISHED = 'finished' IN_PROGRESS = 'in_progress' FAILED = 'failed' + DOCKER_BUILD_IN_PROGRESS = 'docker_build_in_progress' + DOCKER_CREATE_CONTAINER = 'docker_create_container' class Experiment: @@ -36,9 +45,14 @@ def __init__( archive_path: Union[Path, str], collaborators: List[str], sender: str, - init_tensor_dict: dict, + init_tensor_dict_path: Union[Path, str], + director_host: str, + director_port: str, + aggregator_client: AsyncAggregatorGRPCClient, + plan: Plan, plan_path: Union[Path, str] = 'plan/plan.yaml', users: Iterable[str] = None, + docker_config: DockerConfig, ) -> None: """Initialize an experiment object.""" self.name = name @@ -47,14 +61,17 @@ def __init__( self.archive_path = archive_path self.collaborators = collaborators self.sender = sender - self.init_tensor_dict = init_tensor_dict - if isinstance(plan_path, str): - plan_path = Path(plan_path) - self.plan_path = plan_path - self.plan = None + self.init_tensor_dict_path = Path(init_tensor_dict_path) + self.plan_path = Path(plan_path) + self.plan = plan self.users = set() if users is None else set(users) self.status = Status.PENDING - self.aggregator = None + self.aggregator_client = aggregator_client + self.director_host = director_host + self.director_port = director_port + self.last_tensor_dict = {} + self.best_tensor_dict = {} + self.docker_config = docker_config async def start( self, *, @@ -62,43 +79,118 @@ async def start( root_certificate: Union[Path, str] = None, private_key: Union[Path, str] = None, certificate: Union[Path, str] = None, - ): + ) -> None: """Run experiment.""" - self.status = Status.IN_PROGRESS try: logger.info(f'New experiment {self.name} for ' f'collaborators {self.collaborators}') - with ExperimentWorkspace(self.name, self.archive_path): - aggregator_grpc_server = self._create_aggregator_grpc_server( + if self.docker_config.use_docker: + await self._run_aggregator_in_docker( + data_file_path=self.archive_path, tls=tls, root_certificate=root_certificate, private_key=private_key, certificate=certificate, ) - self.aggregator = aggregator_grpc_server.aggregator - await self._run_aggregator_grpc_server( - aggregator_grpc_server=aggregator_grpc_server, - ) + else: + self.status = Status.IN_PROGRESS + with ExperimentWorkspace(self.name, self.archive_path): + aggregator_grpc_server = self._create_aggregator_grpc_server( + director_host=self.director_host, + director_port=self.director_port, + tls=tls, + root_certificate=root_certificate, + private_key=private_key, + certificate=certificate, + ) + await self._run_aggregator_grpc_server( + aggregator_grpc_server=aggregator_grpc_server, + ) + self.status = Status.FINISHED logger.info(f'Experiment "{self.name}" was finished successfully.') except Exception as e: self.status = Status.FAILED logger.exception(f'Experiment "{self.name}" was failed with error: {e}.') + async def stop(self, failed_collaborator: str = None): + await self.aggregator_client.stop(failed_collaborator=failed_collaborator) + + async def get_description(self) -> dict: + description = { + 'name': self.name, + 'status': self.status, + 'collaborators_amount': len(self.collaborators), + } + + if self.status == Status.IN_PROGRESS: + description_pb2 = await self.aggregator_client.get_experiment_description() + description = MessageToDict(description_pb2) + + return description + + async def get_metric_stream(self): + return self.aggregator_client.get_metric_stream() + + async def _run_aggregator_in_docker( + self, *, + data_file_path: Path, + tls: bool = False, + root_certificate: Union[Path, str] = None, + private_key: Union[Path, str] = None, + certificate: Union[Path, str] = None, + ) -> None: + self.status = Status.DOCKER_BUILD_IN_PROGRESS + docker_client = docker.Docker(config=self.docker_config) + docker_context_path = docker.create_aggregator_context( + data_file_path=data_file_path, + init_tensor_dict_path=self.init_tensor_dict_path, + ) + image_tag = await docker_client.build_image( + context_path=docker_context_path, + tag='aggregator', + ) + cmd = ( + f'python run.py ' + f'--experiment_name {self.name} ' + f'--director_host {self.director_host} ' + f'--director_port {self.director_port} ' + f'--init_tensor_dict_path init_tensor_dict.pickle ' + f'--collaborators {" ".join(self.collaborators)} ' + f'--root_certificate {root_certificate} ' + f'--private_key {private_key} ' + f'--certificate {certificate} ' + f'{"--tls " if tls else "--no-tls "}' + ) + self.status = Status.DOCKER_CREATE_CONTAINER + self.container = await docker_client.create_container( + name=f'{self.name.lower()}_aggregator', + image_tag=image_tag, + cmd=cmd, + ) + self.status = Status.IN_PROGRESS + await docker_client.start_and_monitor_container(container=self.container) + await self.container.delete(force=True) + def _create_aggregator_grpc_server( self, *, + director_host, + director_port, tls: bool = True, root_certificate: Union[Path, str] = None, private_key: Union[Path, str] = None, certificate: Union[Path, str] = None, ) -> AggregatorGRPCServer: - self.plan = Plan.parse(plan_config_path=Path(self.plan_path)) self.plan.authorized_cols = list(self.collaborators) logger.info('🧿 Starting the Aggregator Service.') + init_tensor_dict = np.load(str(self.init_tensor_dict_path), allow_pickle=True) aggregator_grpc_server = self.plan.interactive_api_get_server( - tensor_dict=self.init_tensor_dict, + experiment_name=self.name, + director_host=director_host, + director_port=director_port, + tensor_dict=init_tensor_dict, root_certificate=root_certificate, certificate=certificate, private_key=private_key, @@ -106,6 +198,19 @@ def _create_aggregator_grpc_server( ) return aggregator_grpc_server + def _parse_plan(self): + with TarFile(name=self.archive_path, mode='r') as tar_file: + plan_buffer = tar_file.extractfile(f'./{self.plan_path}') + if plan_buffer is None: + raise Exception(f'No {self.plan_path} in workspace.') + plan_data = plan_buffer.read() + local_plan_path = Path(self.name) / self.plan_path + local_plan_path.parent.mkdir(parents=True, exist_ok=True) + with local_plan_path.open('wb') as plan_f: + plan_f.write(plan_data) + plan = Plan.parse(plan_config_path=local_plan_path) + return plan + @staticmethod async def _run_aggregator_grpc_server(aggregator_grpc_server: AggregatorGRPCServer) -> None: """Run aggregator.""" @@ -135,6 +240,8 @@ def __init__(self) -> None: self.__pending_experiments = [] self.__archived_experiments = [] self.__dict = {} + self.last_tensor_dict = {} + self.best_tensor_dict = {} @property def active_experiment(self) -> Union[Experiment, None]: diff --git a/openfl/component/director/run_aggregator.py b/openfl/component/director/run_aggregator.py new file mode 100644 index 0000000000..e5bd9f8780 --- /dev/null +++ b/openfl/component/director/run_aggregator.py @@ -0,0 +1,102 @@ +# Copyright (C) 2022 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Script to run aggregator in docker.""" + +import argparse +import asyncio +import logging +from pathlib import Path + +import numpy as np + +from openfl.federated import Plan +from openfl.interface.cli import setup_logging + +logger = logging.getLogger(__name__) + +setup_logging() + +PLAN_PATH_DEFAULT = 'plan/plan.yaml' + + +def _parse_args(): + parser = argparse.ArgumentParser(description='Run aggregator.') + parser.add_argument('--plan_path', type=str, nargs='?', default=PLAN_PATH_DEFAULT) + + parser.add_argument('--experiment_name', type=str) + parser.add_argument('--director_host', type=str) + parser.add_argument('--director_port', type=int) + + parser.add_argument('--collaborators', type=str, nargs='+') + parser.add_argument('--init_tensor_dict_path', type=str) + parser.add_argument('--root_certificate', type=str, nargs='?', default=None) + parser.add_argument('--private_key', type=str, nargs='?', default=None) + parser.add_argument('--certificate', type=str, nargs='?', default=None) + parser.add_argument('--tls', dest='tls', action='store_true') + parser.add_argument('--no-tls', dest='tls', action='store_false') + parser.set_defaults(tls=True) + + return parser.parse_args() + + +async def main( + plan_path, + experiment_name, + director_host, + director_port, + collaborators, + init_tensor_dict_path, + root_certificate, + certificate, + private_key, + tls, +): + """Run main function.""" + plan = Plan.parse(plan_config_path=Path(plan_path)) + plan.authorized_cols = list(collaborators) + + logger.info('🧿 Starting the Aggregator Service.') + init_tensor_dict = np.load(init_tensor_dict_path, allow_pickle=True) + aggregator_grpc_server = plan.interactive_api_get_server( + experiment_name=experiment_name, + director_host=director_host, + director_port=director_port, + tensor_dict=init_tensor_dict, + root_certificate=root_certificate, + certificate=certificate, + private_key=private_key, + tls=tls, + ) + + logger.info('🧿 Starting the Aggregator Service.') + grpc_server = aggregator_grpc_server.get_server() + grpc_server.start() + logger.info('Starting Aggregator gRPC Server') + + try: + while not aggregator_grpc_server.aggregator.all_quit_jobs_sent(): + # Awaiting quit job sent to collaborators + await asyncio.sleep(10) + except KeyboardInterrupt: + pass + finally: + grpc_server.stop(0) + # Temporary solution to free RAM used by TensorDB + aggregator_grpc_server.aggregator.tensor_db.clean_up(0) + + +if __name__ == '__main__': + args = _parse_args() + asyncio.run(main( + plan_path=args.plan_path, + experiment_name=args.experiment_name, + director_host=args.director_host, + director_port=args.director_port, + collaborators=args.collaborators, + init_tensor_dict_path=args.init_tensor_dict_path, + root_certificate=args.root_certificate, + certificate=args.certificate, + private_key=args.private_key, + tls=args.tls, + )) diff --git a/openfl/component/envoy/envoy.py b/openfl/component/envoy/envoy.py index c363f5d046..4a79d5b1ea 100644 --- a/openfl/component/envoy/envoy.py +++ b/openfl/component/envoy/envoy.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 """Envoy module.""" - +import asyncio import logging import time import traceback @@ -15,6 +15,8 @@ from click import echo +from openfl.docker import docker +from openfl.docker.docker import DockerConfig from openfl.federated import Plan from openfl.interface.interactive_api.shard_descriptor import ShardDescriptor from openfl.plugins.processing_units_monitor.cuda_device_monitor import CUDADeviceMonitor @@ -41,6 +43,8 @@ def __init__( tls: bool = True, cuda_devices: Union[tuple, list] = (), cuda_device_monitor: Optional[Type[CUDADeviceMonitor]] = None, + shard_descriptor_config, + docker_config: DockerConfig, ) -> None: """Initialize a envoy object.""" self.name = shard_name @@ -59,8 +63,8 @@ def __init__( ) self.shard_descriptor = shard_descriptor + self.shard_descriptor_config = shard_descriptor_config self.cuda_devices = tuple(cuda_devices) - # Optional plugins self.cuda_device_monitor = cuda_device_monitor @@ -68,8 +72,11 @@ def __init__( self.running_experiments = {} self.is_experiment_running = False self._health_check_future = None + self.docker_config = docker_config + envoy_info = '\n'.join([f'{k}={v}' for k, v in self.__dict__.items()]) + logger.info(f'Envoy Info: \n{envoy_info}') - def run(self): + async def run(self): """Run of the envoy working cycle.""" while True: try: @@ -83,12 +90,19 @@ def run(self): data_file_path = self._save_data_stream_to_file(data_stream) self.is_experiment_running = True try: - with ExperimentWorkspace( - self.name + '_' + experiment_name, - data_file_path, - is_install_requirements=True - ): - self._run_collaborator() + if self.docker_config.use_docker: + await self._run_collaborator_in_docker( + experiment_name=experiment_name.lower(), + data_file_path=data_file_path, + ) + else: + with ExperimentWorkspace( + self.name + '_' + experiment_name, + data_file_path, + is_install_requirements=True + ): + self._run_collaborator() + except Exception as exc: logger.exception(f'Collaborator failed with error: {exc}:') self.director_client.set_experiment_failed( @@ -163,6 +177,41 @@ def _run_collaborator(self, plan='plan/plan.yaml'): col.set_available_devices(cuda=self.cuda_devices) col.run() + async def _run_collaborator_in_docker(self, experiment_name: str, data_file_path: Path): + """Run the collaborator for the experiment running.""" + docker_client = docker.Docker(config=self.docker_config) + docker_context_path = docker.create_collaborator_context( + data_file_path=data_file_path, + shard_descriptor_config=self.shard_descriptor_config, + ) + image_tag = await docker_client.build_image( + context_path=docker_context_path, + tag=f'{self.name}_{experiment_name}', + ) + cuda_devices = ','.join(map(str, self.cuda_devices)) + gpu_allowed = bool(cuda_devices) + + cmd = ( + f'python run.py ' + f'--name {self.name} ' + f'--plan_path plan/plan.yaml ' + f'--root_certificate {self.root_certificate} ' + f'--private_key {self.private_key} ' + f'--certificate {self.certificate} ' + f'--shard_config shard_descriptor_config.yaml ' + ) + if gpu_allowed: + cmd += f'--cuda_devices {cuda_devices} ' + + container = await docker_client.create_container( + name=f'{experiment_name}_collaborator_{self.name}', + image_tag=image_tag, + cmd=cmd, + gpu_allowed=gpu_allowed, + ) + await docker_client.start_and_monitor_container(container=container) + await container.delete(force=True) + def start(self): """Start the envoy.""" try: @@ -176,7 +225,7 @@ def start(self): # Shard accepted for participation in the federation logger.info('Shard accepted') self._health_check_future = self.executor.submit(self.send_health_check) - self.run() + asyncio.run(self.run()) else: # Shut down logger.error('Report shard info was not accepted') diff --git a/openfl/component/envoy/run_collaborator.py b/openfl/component/envoy/run_collaborator.py new file mode 100644 index 0000000000..fb352e9c4f --- /dev/null +++ b/openfl/component/envoy/run_collaborator.py @@ -0,0 +1,89 @@ +# Copyright (C) 2022 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Run collaborator in docker module.""" +import argparse +import logging +from importlib import import_module +from pathlib import Path +from typing import Optional +from typing import Tuple + +import yaml +from click import echo + +from openfl.federated import Plan +from openfl.interface.cli import setup_logging + +logger = logging.getLogger(__name__) + +setup_logging() + +PLAN_PATH_DEFAULT = 'plan/plan.yaml' + + +def _parse_args(): + parser = argparse.ArgumentParser(description='Run collaborator.') + parser.add_argument('--name', type=str) + parser.add_argument('--plan_path', type=str, nargs='?', default=PLAN_PATH_DEFAULT) + parser.add_argument('--root_certificate', type=str, nargs='?', default=None) + parser.add_argument('--private_key', type=str, nargs='?', default=None) + parser.add_argument('--certificate', type=str, nargs='?', default=None) + parser.add_argument('--shard_config', type=str, nargs='?', default=None) + parser.add_argument('--cuda_devices', type=str, nargs='?', default='') + return parser.parse_args() + + +def _run_collaborator( + plan_path: str, + name: str, + root_certificate: str, + private_key: str, + certificate: str, + shard_descriptor: str, + cuda_devices: Optional[Tuple[str]], +) -> None: + plan = Plan.parse(plan_config_path=Path(plan_path)) + echo(f'Data = {plan.cols_data_paths}') + logger.info('🧿 Starting a Collaborator Service.') + + col = plan.get_collaborator(name, root_certificate, private_key, + certificate, shard_descriptor=shard_descriptor) + if cuda_devices is not None: + col.set_available_devices(cuda=cuda_devices) + col.run() + + +def _shard_descriptor_from_config(shard_config: dict): + template = shard_config.get('template') + if not template: + raise Exception('You should define a shard ' + 'descriptor template in the envoy config') + class_name = template.split('.')[-1] + module_path = '.'.join(template.split('.')[:-1]) + params = shard_config.get('params', {}) + + module = import_module(module_path) + instance = getattr(module, class_name)(**params) + + return instance + + +if __name__ == '__main__': + args = _parse_args() + with open(args.shard_config) as f: + shard_descriptor_config = yaml.safe_load(f) + shard_descriptor = _shard_descriptor_from_config(shard_descriptor_config) + if args.cuda_devices != '': + cuda_devices = tuple(args.cuda_devices.split(',')) + else: + cuda_devices = () + _run_collaborator( + plan_path=args.plan_path, + name=args.name, + root_certificate=args.root_certificate, + private_key=args.private_key, + certificate=args.certificate, + shard_descriptor=shard_descriptor, + cuda_devices=cuda_devices, + ) diff --git a/openfl/docker/__init__.py b/openfl/docker/__init__.py new file mode 100644 index 0000000000..8c7da1f42a --- /dev/null +++ b/openfl/docker/__init__.py @@ -0,0 +1,4 @@ +# Copyright (C) 2022 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Docker package.""" diff --git a/openfl/docker/docker.py b/openfl/docker/docker.py new file mode 100644 index 0000000000..0b9dcf2dbc --- /dev/null +++ b/openfl/docker/docker.py @@ -0,0 +1,196 @@ +# Copyright (C) 2022 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Docker module.""" + +from dataclasses import dataclass +import logging +import os +from io import BytesIO +from pathlib import Path +from tarfile import TarFile +from typing import Dict +from typing import List +from typing import Optional +from dataclasses import field + +import aiodocker +import yaml +from aiodocker.containers import DockerContainer + +logger = logging.getLogger(__name__) + +OPENFL_ROOT_PATH = Path(__file__).parent.parent.parent.absolute() + + +@dataclass +class DockerConfig: + use_docker: bool = True + env: Dict[str, str] = field(default_factory=dict) + buildargs: Dict[str, str] = field(default_factory=dict) + volumes: List[str] = field(default_factory=list) + + @property + def binds(self) -> List[str]: + """Convert docker volumes to binds.""" + binds = [] + for volume in map(lambda x: x.split(':'), self.volumes): + target = volume[0] + bind = volume[0] if len(volume) == 1 else volume[1] + + if bind.startswith('./'): + bind = bind.replace('.', '/code', 1) + elif bind.startswith('~'): + bind = bind.replace('~', '/root', 1) + + if target.startswith('~'): + target = str(Path(target).expanduser().resolve()) + if target.startswith('./'): + target = str(Path(target).expanduser().resolve()) + binds.append(f'{target}:{bind}') + + return binds + + @property + def env_list(self): + env_lst = [] + for k, v in self.env.items(): + if v is None: + continue + env_lst.append(f'{k}={v}') + return env_lst + + +class Docker: + """Docker class.""" + + def __init__( + self, *, + config: Optional[DockerConfig] = None, + ): + """Initialize an docker object.""" + self.docker = aiodocker.Docker() + if config is None: + config = DockerConfig() + self.config = config + + async def build_image( + self, *, + context_path: Path, + tag: str, + ) -> str: + """Build docker image.""" + with open(context_path, 'rb') as f: + fileobj = BytesIO(f.read()) + build_image_iter = self.docker.images.build( + fileobj=fileobj, + encoding='gzip', + tag=tag, + stream=True, + buildargs=self.config.buildargs, + ) + async for message in build_image_iter: + if 'stream' not in message or len(message) > 1: + print(message) + logger.info(f'DOCKER BUILD {message.get("stream")}') + return f'{tag}:latest' + + async def create_container( + self, *, + name: str, + image_tag: str, + cmd: str, + gpu_allowed: bool = False, + ) -> DockerContainer: + """Create docker container.""" + config = { + 'Cmd': ['/bin/bash', '-c', cmd], + 'Env': self.config.env_list, + 'Image': image_tag, + 'HostConfig': { + 'NetworkMode': 'host', + 'Binds': self.config.binds, + 'ShmSize': 30 * 1024 * 1024 * 1024, + }, + } + if gpu_allowed: + config['HostConfig'].update(**{ + 'DeviceRequests': [{ + 'Driver': 'nvidia', + 'Count': -1, + 'Capabilities': [['gpu', 'compute', 'utility']], + }], + }) + + logger.info(f'{config=}') + + return await self.docker.containers.create_or_replace( + config=config, + name=name, + ) + + async def start_and_monitor_container( + self, *, + container: DockerContainer, + ) -> None: + """Start and monitor docker container.""" + subscriber = self.docker.events.subscribe() + await container.start() + logs_stream = container.log(stdout=True, stderr=True, follow=True) + async for log in logs_stream: + logger.info(f'CONTAINER {log}') + while True: + event = await subscriber.get() + if event is None: + break + + action = event.get('Action') + + if event['Actor']['ID'] == container._id: + if action == 'stop': + logger.info(f'=> deleted {container._id[:12]}') + break + elif action == 'destroy': + logger.info('=> done with this container!') + break + elif action == 'die': + logger.info('=> container is died') + break + + +def create_aggregator_context(data_file_path: Path, init_tensor_dict_path: Path): + """Create context for aggregator service.""" + with TarFile(name=data_file_path, mode='a') as tar_file: + docker_file_path = OPENFL_ROOT_PATH / 'openfl-docker' / 'Dockerfile.aggregator' + run_aggregator_path = (OPENFL_ROOT_PATH / 'openfl' / 'component' / 'director' + / 'run_aggregator.py') + tar_file.add(docker_file_path, 'Dockerfile') + tar_file.add(run_aggregator_path, 'run.py') + tar_file.add(init_tensor_dict_path, 'init_tensor_dict.pickle') + return data_file_path + + +def create_collaborator_context( + data_file_path: Path, + shard_descriptor_config +) -> Path: + """Create context for collaborator service.""" + with TarFile(name=data_file_path, mode='a') as tar_file: + docker_file_path = OPENFL_ROOT_PATH / 'openfl-docker' / 'Dockerfile.collaborator' + run_collaborator_path = (OPENFL_ROOT_PATH / 'openfl' / 'component' / 'envoy' + / 'run_collaborator.py') + + with open('shard_descriptor_config.yaml', 'w') as f: + yaml.dump(shard_descriptor_config, f) + tar_file.add('shard_descriptor_config.yaml', 'shard_descriptor_config.yaml') + os.remove('shard_descriptor_config.yaml') + + tar_file.add(docker_file_path, 'Dockerfile') + tar_file.add(run_collaborator_path, 'run.py') + + template = shard_descriptor_config['template'] + module_path = template.split('.')[:-1] + module_path[-1] = f'{module_path[-1]}.py' + shar_descriptor_path = str(Path.joinpath(Path('.'), *module_path)) + tar_file.add(shar_descriptor_path, shar_descriptor_path) + return data_file_path diff --git a/openfl/federated/plan/plan.py b/openfl/federated/plan/plan.py index c4acceab02..b9be0cb463 100644 --- a/openfl/federated/plan/plan.py +++ b/openfl/federated/plan/plan.py @@ -315,7 +315,17 @@ def get_tasks(self): tasks[task]['aggregation_type'] = aggregation_type return tasks - def get_aggregator(self, tensor_dict=None): + def get_aggregator( + self, + director_host, + director_port, + experiment_name: str = None, + tensor_dict=None, + tls=False, + root_certificate=None, + certificate=None, + private_key=None, + ): """Get federation aggregator.""" defaults = self.config.get('aggregator', { @@ -328,6 +338,13 @@ def get_aggregator(self, tensor_dict=None): defaults[SETTINGS]['authorized_cols'] = self.authorized_cols defaults[SETTINGS]['assigner'] = self.get_assigner() defaults[SETTINGS]['compression_pipeline'] = self.get_tensor_pipe() + defaults[SETTINGS]['root_certificate'] = root_certificate + defaults[SETTINGS]['certificate'] = certificate + defaults[SETTINGS]['private_key'] = private_key + defaults[SETTINGS]['tls'] = tls + defaults[SETTINGS]['director_host'] = director_host + defaults[SETTINGS]['director_port'] = director_port + defaults[SETTINGS]['experiment_name'] = experiment_name log_metric_callback = defaults[SETTINGS].get('log_metric_callback') if log_metric_callback: @@ -508,7 +525,8 @@ def get_client(self, collaborator_name, aggregator_uuid, federation_uuid, return self.client_ - def get_server(self, root_certificate=None, private_key=None, certificate=None, **kwargs): + def get_server(self, director_host, director_port, root_certificate=None, private_key=None, + certificate=None, **kwargs): """Get gRPC server of the aggregator instance.""" common_name = self.config['network'][SETTINGS]['agg_addr'].lower() @@ -526,15 +544,30 @@ def get_server(self, root_certificate=None, private_key=None, certificate=None, server_args['certificate'] = certificate server_args['private_key'] = private_key - server_args['aggregator'] = self.get_aggregator() + server_args['aggregator'] = self.get_aggregator( + director_host=director_host, + director_port=director_port, + root_certificate=root_certificate, + private_key=private_key, + certificate=certificate, + ) if self.server_ is None: self.server_ = AggregatorGRPCServer(**server_args) return self.server_ - def interactive_api_get_server(self, *, tensor_dict, root_certificate, certificate, - private_key, tls): + def interactive_api_get_server( + self, *, + experiment_name, + director_host, + director_port, + tensor_dict, + root_certificate, + certificate, + private_key, + tls, + ): """Get gRPC server of the aggregator instance.""" server_args = self.config['network'][SETTINGS] @@ -544,7 +577,16 @@ def interactive_api_get_server(self, *, tensor_dict, root_certificate, certifica server_args['private_key'] = private_key server_args['tls'] = tls - server_args['aggregator'] = self.get_aggregator(tensor_dict) + server_args['aggregator'] = self.get_aggregator( + experiment_name=experiment_name, + director_host=director_host, + director_port=director_port, + tensor_dict=tensor_dict, + root_certificate=root_certificate, + certificate=certificate, + private_key=private_key, + tls=tls, + ) if self.server_ is None: self.server_ = AggregatorGRPCServer(**server_args) diff --git a/openfl/interface/director.py b/openfl/interface/director.py index b07468b376..23433e92b2 100644 --- a/openfl/interface/director.py +++ b/openfl/interface/director.py @@ -15,6 +15,7 @@ from dynaconf import Validator from openfl.component.director import Director +from openfl.docker.docker import DockerConfig from openfl.interface.cli_helper import WORKSPACE from openfl.transport import DirectorGRPCServer from openfl.utilities import merge_configs @@ -44,7 +45,9 @@ def director(context): @option('-oc', '--public-cert-path', 'certificate', required=False, type=ClickPath(exists=True), default=None, help='Path to a signed certificate') -def start(director_config_path, tls, root_certificate, private_key, certificate): +@option('--use-docker/--no-use-docker', default=False, is_flag=True, + help='Use docker to run aggregator.') +def start(director_config_path, tls, root_certificate, private_key, certificate, use_docker): """Start the director service.""" director_config_path = Path(director_config_path).absolute() logger.info('🧿 Starting the Director Service.') @@ -85,6 +88,16 @@ def start(director_config_path, tls, root_certificate, private_key, certificate) if config.certificate: config.certificate = Path(config.certificate).absolute() + docker_config = config.as_dict().get('SETTINGS', {}).pop('docker', {}) + docker_env = docker_config.get('env', {}) + docker_buildargs = docker_config.get('buildargs', {}) + docker_volumes = docker_config.get('volumes', {}) + docker_config = DockerConfig( + use_docker=use_docker, + env=docker_env, + buildargs=docker_buildargs, + volumes=docker_volumes, + ) director_server = DirectorGRPCServer( director_cls=Director, tls=tls, @@ -96,6 +109,7 @@ def start(director_config_path, tls, root_certificate, private_key, certificate) settings=config.settings, listen_host=config.settings.listen_host, listen_port=config.settings.listen_port, + docker_config=docker_config, ) director_server.start() diff --git a/openfl/interface/envoy.py b/openfl/interface/envoy.py index 8d6a2564ce..dcfc695d27 100644 --- a/openfl/interface/envoy.py +++ b/openfl/interface/envoy.py @@ -16,6 +16,7 @@ from dynaconf import Validator from openfl.component.envoy.envoy import Envoy +from openfl.docker.docker import DockerConfig from openfl.interface.cli_helper import WORKSPACE from openfl.utilities import click_types from openfl.utilities import merge_configs @@ -48,8 +49,10 @@ def envoy(context): help='Path to a private key', type=ClickPath(exists=True)) @option('-oc', '--public-cert-path', 'certificate', default=None, help='Path to a signed certificate', type=ClickPath(exists=True)) +@option('--use-docker/--no-use-docker', default=False, is_flag=True, + help='Use docker to run collaborator.') def start_(shard_name, director_host, director_port, tls, envoy_config_path, - root_certificate, private_key, certificate): + root_certificate, private_key, certificate, use_docker): """Start the Envoy.""" logger.info('🧿 Starting the Envoy.') if is_directory_traversal(envoy_config_path): @@ -77,7 +80,7 @@ def start_(shard_name, director_host, director_port, tls, envoy_config_path, config.certificate = Path(config.certificate).absolute() # Parse envoy parameters - envoy_params = config.get('params', {}) + envoy_params = config.as_dict().get('PARAMS', {}) # Build optional plugin components optional_plugins_section = config.get('optional_plugin_components') @@ -93,18 +96,30 @@ def start_(shard_name, director_host, director_port, tls, envoy_config_path, module = import_module(module_path) instance = getattr(module, class_name)(**plugin_params) envoy_params[plugin_name] = instance - # Instantiate Shard Descriptor - shard_descriptor = shard_descriptor_from_config(config.get('shard_descriptor', {})) + shard_descriptor_config = config.as_dict().get('SHARD_DESCRIPTOR', {}) + shard_descriptor = shard_descriptor_from_config(shard_descriptor_config) + docker_config = envoy_params.pop('docker', {}) + docker_env = docker_config.get('env', {}) + docker_buildargs = docker_config.get('buildargs', {}) + docker_volumes = docker_config.get('volumes', []) + docker_config = DockerConfig( + use_docker=use_docker, + env=docker_env, + buildargs=docker_buildargs, + volumes=docker_volumes, + ) envoy = Envoy( shard_name=shard_name, director_host=director_host, director_port=director_port, tls=tls, shard_descriptor=shard_descriptor, + shard_descriptor_config=shard_descriptor_config, root_certificate=config.root_certificate, private_key=config.private_key, certificate=config.certificate, + docker_config=docker_config, **envoy_params ) diff --git a/openfl/interface/interactive_api/experiment.py b/openfl/interface/interactive_api/experiment.py index 73f02f8ef8..bef763590b 100644 --- a/openfl/interface/interactive_api/experiment.py +++ b/openfl/interface/interactive_api/experiment.py @@ -286,7 +286,7 @@ def _pack_the_workspace(): from os import makedirs from os.path import basename - archive_type = 'zip' + archive_type = 'tar' archive_name = basename(getcwd()) tmp_dir = 'temp_' + archive_name diff --git a/openfl/protocols/aggregator.proto b/openfl/protocols/aggregator.proto index 9d995dbc6b..d04f2c45d3 100644 --- a/openfl/protocols/aggregator.proto +++ b/openfl/protocols/aggregator.proto @@ -15,7 +15,8 @@ service Aggregator { rpc GetMetricStream(GetMetricStreamRequest) returns (stream GetMetricStreamResponse) {} rpc GetTrainedModel(GetTrainedModelRequest) returns (TrainedModelResponse) {} rpc GetExperimentDescription(GetExperimentDescriptionRequest) - returns (GetExperimentDescriptionResponse) {} + returns (GetExperimentDescriptionResponse) {} + rpc Stop(StopRequest) returns (StopResponse) {} } message MessageHeader { @@ -50,7 +51,7 @@ message GetAggregatedTensorRequest { string tensor_name = 2; int32 round_number = 3; bool report = 4; - repeated string tags = 5; + repeated string tags = 5; bool require_lossless = 6; } @@ -66,7 +67,7 @@ message TaskResults { MessageHeader header = 1; int32 round_number = 2; string task_name = 3; - int32 data_size = 4; + int32 data_size = 4; repeated NamedTensor tensors = 5; } @@ -108,3 +109,9 @@ message GetExperimentDescriptionRequest {} message GetExperimentDescriptionResponse { ExperimentDescription experiment = 1; } + +message StopRequest { + string failed_collaborator = 1; +} + +message StopResponse {} diff --git a/openfl/protocols/director.proto b/openfl/protocols/director.proto index 6e6922db1c..69d8d5b874 100644 --- a/openfl/protocols/director.proto +++ b/openfl/protocols/director.proto @@ -10,7 +10,8 @@ import "google/protobuf/duration.proto"; import "openfl/protocols/base.proto"; service Director { - // 1. Envoy RPCs + // 1. Services RPCs + // 1.1 Envoy RPCs rpc UpdateShardInfo(UpdateShardInfoRequest) returns (UpdateShardInfoResponse) {} // Shard owner could also provide some public data for tests rpc WaitExperiment(stream WaitExperimentRequest) returns (stream WaitExperimentResponse) {} @@ -18,10 +19,14 @@ service Director { rpc UpdateEnvoyStatus(UpdateEnvoyStatusRequest) returns (UpdateEnvoyStatusResponse) {} rpc SetExperimentFailed (SetExperimentFailedRequest) returns (SetExperimentFailedResponse) {} + // 1.2 Aggregator RPCs + rpc UploadExperimentModel(UploadExperimentModelRequest) + returns (UploadExperimentModelResponse) {} + // 2. Frontend RPCs // 2.1 Extension RPCs rpc GetExperimentDescription(GetExperimentDescriptionRequest) - returns (GetExperimentDescriptionResponse) {} + returns (GetExperimentDescriptionResponse) {} rpc GetExperimentsList(GetExperimentsListRequest) returns (GetExperimentsListResponse){} // 2.2 API RPCs @@ -43,7 +48,7 @@ message CudaDeviceInfo { string name = 7; } -// Envoy Messages +// Services. Envoy Messages message NodeInfo { string name = 1; @@ -104,6 +109,20 @@ message SetExperimentFailedRequest { message SetExperimentFailedResponse {} +// Services. Aggregator messages. + +message UploadExperimentModelRequest { + enum ModelType { + LAST_BEST_MODEL = 0; + LAST_MODEL = 1; + } + ModelProto model_proto = 1; + ModelType model_type = 2; + string experiment_name = 3; +} + +message UploadExperimentModelResponse {} + // Frontend. Extension Messages. message GetExperimentsListRequest {} diff --git a/openfl/transport/grpc/aggregator_client.py b/openfl/transport/grpc/aggregator_client.py index 61e19ad48e..0d5ae603ae 100644 --- a/openfl/transport/grpc/aggregator_client.py +++ b/openfl/transport/grpc/aggregator_client.py @@ -10,6 +10,7 @@ from typing import Tuple import grpc +from google.protobuf.json_format import MessageToDict from openfl.pipelines import NoCompressionPipeline from openfl.protocols import aggregator_pb2 @@ -536,4 +537,9 @@ async def get_experiment_description(self): """Get experiment info.""" request = aggregator_pb2.GetExperimentDescriptionRequest() response = await self.stub.GetExperimentDescription(request) - return response.experiment + return MessageToDict(response.experiment) + + async def stop(self, failed_collaborator: str = None): + request = aggregator_pb2.StopRequest(failed_collaborator=failed_collaborator) + response = await self.stub.Stop(request) + return response diff --git a/openfl/transport/grpc/aggregator_server.py b/openfl/transport/grpc/aggregator_server.py index b0bf81834e..1c6e7b3609 100644 --- a/openfl/transport/grpc/aggregator_server.py +++ b/openfl/transport/grpc/aggregator_server.py @@ -333,6 +333,9 @@ def GetExperimentDescription(self, request, context): # NOQA:N802 ), ) + def Stop(self, request, context): # NOQA:N802 + self.aggregator.stop(failed_collaborator=request.failed_collaborator) + def get_server(self): """Return gRPC server.""" self.server = server(ThreadPoolExecutor(max_workers=cpu_count() + 1), diff --git a/openfl/transport/grpc/director_client.py b/openfl/transport/grpc/director_client.py index 5a873a129a..912cc113f7 100644 --- a/openfl/transport/grpc/director_client.py +++ b/openfl/transport/grpc/director_client.py @@ -309,3 +309,22 @@ def get_experiment_description(self, name): director_pb2.GetExperimentDescriptionRequest(name=name) ) return response.experiment + + def upload_experiment_model(self, experiment_name: str, model_proto, model_type: str = 'last'): + if model_type == 'best': + model_type = director_pb2.UploadExperimentModelRequest.LAST_BEST_MODEL + elif model_type == 'last': + model_type = director_pb2.UploadExperimentModelRequest.LAST_MODEL + else: + raise ValueError( + f'Invalid {model_type=} for upload_experiment_model function. ' + f'Allowed values: "last", "best"' + ) + response = self.stub.UploadExperimentModel( + director_pb2.UploadExperimentModelRequest( + model_proto=model_proto, + model_type=model_type, + experiment_name=experiment_name, + ) + ) + return response diff --git a/openfl/transport/grpc/director_server.py b/openfl/transport/grpc/director_server.py index 6c2d5432c7..9040faadc7 100644 --- a/openfl/transport/grpc/director_server.py +++ b/openfl/transport/grpc/director_server.py @@ -15,9 +15,11 @@ from grpc import aio from grpc import ssl_server_credentials +from openfl.docker.docker import DockerConfig from openfl.pipelines import NoCompressionPipeline from openfl.protocols import director_pb2 from openfl.protocols import director_pb2_grpc +from openfl.protocols import utils from openfl.protocols.utils import construct_model_proto from openfl.protocols.utils import deconstruct_model_proto from openfl.protocols.utils import get_headers @@ -39,6 +41,7 @@ def __init__( certificate: Optional[Union[Path, str]] = None, listen_host: str = '[::]', listen_port: int = 50051, + docker_config: DockerConfig, **kwargs, ) -> None: """Initialize a director object.""" @@ -58,6 +61,9 @@ def __init__( root_certificate=self.root_certificate, private_key=self.private_key, certificate=self.certificate, + director_host=listen_host, + director_port=listen_port, + docker_config=docker_config, **kwargs ) @@ -160,11 +166,18 @@ async def GetTrainedModel(self, request, context): # NOQA:N802 logger.info('Request GetTrainedModel has got!') caller = self.get_caller(context) + model_type_enum = director_pb2.UploadExperimentModelRequest.ModelType + if request.model_type == model_type_enum.LAST_MODEL: + model_type = 'last' + elif request.model_type == model_type_enum.LAST_BEST_MODEL: + model_type = 'best' + else: + raise Exception(f'Invalid {request.model_type=} in GetTrainedModel function.') trained_model_dict = await self.director.get_trained_model( experiment_name=request.experiment_name, caller=caller, - model_type=request.model_type + model_type=model_type ) if trained_model_dict is None: @@ -243,7 +256,7 @@ async def SetExperimentFailed(self, request, context): # NOQA:N802 logger.error(f'Collaborator {request.collaborator_name} was failed with error code:' f' {request.error_code}, error_description: {request.error_description}' f'Stopping experiment.') - self.director.set_experiment_failed( + await self.director.set_experiment_failed( experiment_name=request.experiment_name, collaborator_name=request.collaborator_name ) @@ -303,3 +316,21 @@ async def GetExperimentDescription(self, request, context): # NOQA:N802 experiment = await self.director.get_experiment_description(caller, request.name) return director_pb2.GetExperimentDescriptionResponse(experiment=experiment) + + async def UploadExperimentModel(self, request, context): # NOQA:N802 + """Upload an experiment model from aggregator.""" + model_type_enum = director_pb2.UploadExperimentModelRequest.ModelType + if request.model_type == model_type_enum.LAST_MODEL: + model_type = 'last' + elif request.model_type == model_type_enum.LAST_BEST_MODEL: + model_type = 'best' + else: + raise Exception('Invalid model_type value in UploadExperimentModel function.') + tensor_dict, _ = utils.deconstruct_model_proto( + request.model_proto, compression_pipeline=NoCompressionPipeline()) + await self.director.upload_experiment_model( + experiment_name=request.experiment_name, + tensor_dict=tensor_dict, + model_type=model_type, + ) + return director_pb2.UploadExperimentModelResponse() diff --git a/openfl/utilities/workspace.py b/openfl/utilities/workspace.py index 08847f76aa..5c367ab6d0 100644 --- a/openfl/utilities/workspace.py +++ b/openfl/utilities/workspace.py @@ -61,7 +61,7 @@ def __enter__(self): shutil.rmtree(self.experiment_work_dir) os.makedirs(self.experiment_work_dir) - shutil.unpack_archive(self.data_file_path, self.experiment_work_dir, format='zip') + shutil.unpack_archive(self.data_file_path, self.experiment_work_dir, format='tar') if self.is_install_requirements: self._install_requirements() @@ -71,13 +71,15 @@ def __enter__(self): # This is needed for python module finder sys.path.append(self.experiment_work_dir) + return self.experiment_work_dir + def __exit__(self, exc_type, exc_value, traceback): """Remove the workspace.""" os.chdir(self.cwd) shutil.rmtree(self.experiment_name, ignore_errors=True) if self.experiment_work_dir in sys.path: sys.path.remove(self.experiment_work_dir) - os.remove(self.data_file_path) + self.data_file_path.unlink(missing_ok=True) def dump_requirements_file( @@ -93,7 +95,7 @@ def dump_requirements_file( if prefixes is None: prefixes = set() elif type(prefixes) is str: - prefixes = set(prefixes,) + prefixes = set(prefixes, ) else: prefixes = set(prefixes) diff --git a/setup.py b/setup.py index 1e79a0b422..1a07112b5f 100644 --- a/setup.py +++ b/setup.py @@ -84,6 +84,7 @@ def run(self): 'openfl.component.envoy', 'openfl.cryptography', 'openfl.databases', + 'openfl.docker', 'openfl.federated', 'openfl.federated.data', 'openfl.federated.plan', @@ -113,6 +114,7 @@ def run(self): ], include_package_data=True, install_requires=[ + 'aiodocker==0.21.0', 'Click==8.0.1', 'PyYAML>=5.4.1', 'cloudpickle', @@ -127,7 +129,7 @@ def run(self): 'pandas', 'protobuf', 'requests', - 'rich==9.1.0', + 'rich==12.0.1', 'scikit-learn', 'tensorboard', 'tensorboardX',