diff --git a/docs/mermaid/director_aggregator.mmd b/docs/mermaid/director_aggregator.mmd
new file mode 100644
index 0000000000..69cf6f11df
--- /dev/null
+++ b/docs/mermaid/director_aggregator.mmd
@@ -0,0 +1,22 @@
+sequenceDiagram
+ participant D as Director
+ participant A as Aggregator
+ rect rgb(0, 255, 0,.1)
+ Note over D: An Experiment's start
+ D->>D: Get next experiment
from Experiment Registry
+ opt Docker specific logic
+ D->>D: Create aggregator docker build
context from experiment workspace.
(Add Dockerfile and execution script
to context specific for aggregator)
+ D->>D: Build aggregator docker image
+ D->>D: Create aggregator docker container
+ D->>D: Run aggregator docker container
+ D->>D: Monitor aggregator docker container
+ end
+ loop every round
+ A->>D: Send last/best model to director
+ D->>D: Save model on director
+ end
+ opt Docker specific logic
+ D->>D: Delete aggregator docker container
when experiment was finished
+ end
+ end
+ Note over D: The Experiment ended.
The Federation keeps existing.
\ No newline at end of file
diff --git a/docs/mermaid/director_envoy.mmd b/docs/mermaid/director_envoy.mmd
new file mode 100644
index 0000000000..00bda8ac0e
--- /dev/null
+++ b/docs/mermaid/director_envoy.mmd
@@ -0,0 +1,55 @@
+sequenceDiagram
+ participant N as NoteBook
+ participant D as Director
+ participant E as Envoy
+ rect rgb(0, 255, 0,.1)
+ Note over D,E: A Federation startup process
+ D->D: Starts
+ E->E: Adapting a dataset
+ E->E: Starts
+ Note over D,E: Exchange certs
+ E-->>D: Connects using FQDN and pwd
+ E-->>D: Communicates dataset info to
+ D-->D: Keeps a list of connected Envoys
+ D-->D: Ensures unified data interface
+ end
+ Note over D,E: We consider a Federation set up
+ rect rgb(0, 255, 0,.1)
+ Note over N,D: Create new experiment
+ N->>N: Prepare experiment in Notebook
+ N->>N: Connect to federation
+ N->>N: Run experiment
+ N->>D: Send Experiment workspace
+ D->>D: Create new experiment
+ D->>D: Add experiment to regestry
+ end
+ rect rgb(0, 255, 0,.1)
+ Note over D,E: An Experiment's start
+ D->>D: Get next experiment
from Experiment Registry
+ opt Docker specific logic
+ D->>D: Create aggregator docker build
context from experiment workspace.
(Add Dockerfile and execution script
to context specific for aggregator)
+ D->>D: Build aggregator docker image
+ D->>D: Create aggregator docker container
+ D->>D: Run aggregator docker container
+ D->>D: Monitor aggregator docker container
+ end
+ E->>D: WaitExperiment
+ D-->>E: Send Experiment name
+ E->>D: GetExperimentData(experiment_name)
+ D-->>E: Send Experiment workspace
+ opt Docker specific logic
+ E->>E: Create collaborator docker build
context from Experiment workspace.
(Add Dockerfile and execution script
to context specific for collaborator)
+ E->>E: Build collaborator docker image
+ E->>E: Create collaborator docker container
+ E->>E: Run collaborator docker container
+ E->>E: Monitor collaborator docker container
+ end
+ Note over D,E: Wait for last round finished
+ opt Docker specific logic
+ E->>E: Delete collaborator docker container
when experiment was finished
+ D->>D: Delete aggregator docker container
when experiment was finished
+ end
+ end
+ N->>D: Get best model
+ D-->>N: Send best model
+ Note over D,E: The Experiment ended.
The Federation keeps existing.
\ No newline at end of file
diff --git a/openfl-docker/Dockerfile.aggregator b/openfl-docker/Dockerfile.aggregator
new file mode 100644
index 0000000000..ac466b2865
--- /dev/null
+++ b/openfl-docker/Dockerfile.aggregator
@@ -0,0 +1,7 @@
+FROM python:3.8
+
+RUN pip install --upgrade pip
+RUN pip install git+https://github.com/dmitryagapov/openfl.git@feature/run_collaborator_in_docker
+
+COPY . /code
+WORKDIR /code
\ No newline at end of file
diff --git a/openfl-docker/Dockerfile.collaborator b/openfl-docker/Dockerfile.collaborator
new file mode 100644
index 0000000000..8c152da1c9
--- /dev/null
+++ b/openfl-docker/Dockerfile.collaborator
@@ -0,0 +1,10 @@
+FROM python:3.8
+
+RUN pip install --upgrade pip
+RUN pip install git+https://github.com/dmitryagapov/openfl.git@feature/run_collaborator_in_docker
+
+WORKDIR /code
+COPY ./requirements.txt .
+RUN pip install --no-cache-dir -r requirements.txt
+
+COPY . .
diff --git a/openfl-tutorials/interactive_api/MXNet_landmarks/director/start_director_docker.sh b/openfl-tutorials/interactive_api/MXNet_landmarks/director/start_director_docker.sh
new file mode 100755
index 0000000000..265548d3f8
--- /dev/null
+++ b/openfl-tutorials/interactive_api/MXNet_landmarks/director/start_director_docker.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+set -e
+
+fx director start --disable-tls -c director_config.yaml --use-docker
diff --git a/openfl-tutorials/interactive_api/MXNet_landmarks/envoy/start_envoy_docker.sh b/openfl-tutorials/interactive_api/MXNet_landmarks/envoy/start_envoy_docker.sh
new file mode 100755
index 0000000000..72a15413ed
--- /dev/null
+++ b/openfl-tutorials/interactive_api/MXNet_landmarks/envoy/start_envoy_docker.sh
@@ -0,0 +1,6 @@
+#!/bin/bash
+set -e
+ENVOY_NAME=$1
+SHARD_CONF=$2
+
+fx envoy start -n "$ENVOY_NAME" --disable-tls --envoy-config-path "$SHARD_CONF" -dh localhost -dp 50051
diff --git a/openfl-tutorials/interactive_api/PyTorch_DogsCats_ViT/envoy/envoy_config_one.yaml b/openfl-tutorials/interactive_api/PyTorch_DogsCats_ViT/envoy/envoy_config_one.yaml
index 0e997c2edd..65d416eb3f 100644
--- a/openfl-tutorials/interactive_api/PyTorch_DogsCats_ViT/envoy/envoy_config_one.yaml
+++ b/openfl-tutorials/interactive_api/PyTorch_DogsCats_ViT/envoy/envoy_config_one.yaml
@@ -1,13 +1,16 @@
params:
- cuda_devices: [0]
-
+ cuda_devices: [ 0 ]
+
optional_plugin_components:
- cuda_device_monitor:
- template: openfl.plugins.processing_units_monitor.pynvml_monitor.PynvmlCUDADeviceMonitor
- settings: []
+ cuda_device_monitor:
+ template: openfl.plugins.processing_units_monitor.pynvml_monitor.PynvmlCUDADeviceMonitor
+ settings: [ ]
shard_descriptor:
template: dogs_cats_shard_descriptor.DogsCatsShardDescriptor
+ volumes:
+ - '~/.kaggle/kaggle.json'
+ - './data'
params:
data_folder: data
rank_worldsize: 1,2
diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/director_config.yaml b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/director_config.yaml
index 860e0436f5..86849ed0ca 100644
--- a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/director_config.yaml
+++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/director_config.yaml
@@ -1,6 +1,15 @@
settings:
listen_host: localhost
listen_port: 50050
- sample_shape: ['300', '400', '3']
- target_shape: ['300', '400']
+ sample_shape: [ '300', '400', '3' ]
+ target_shape: [ '300', '400' ]
envoy_health_check_period: 5 # in seconds
+ docker:
+ env:
+ http_proxy:
+ https_proxy:
+ no_proxy:
+ buildargs:
+ HTTP_PROXY:
+ HTTPS_PROXY:
+ NO_PROXY:
diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/start_director_docker.sh b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/start_director_docker.sh
new file mode 100755
index 0000000000..ce839b9765
--- /dev/null
+++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/director/start_director_docker.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+set -e
+
+fx director start --disable-tls -c director_config.yaml --use-docker
\ No newline at end of file
diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config.yaml b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config.yaml
index aae095f4e7..09fca7774f 100644
--- a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config.yaml
+++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config.yaml
@@ -1,10 +1,21 @@
params:
- cuda_devices: [0,2]
-
+ cuda_devices: [ 0, 2 ]
+ docker:
+ env:
+ http_proxy:
+ https_rpoxy:
+ no_proxy:
+ buildargs:
+ HTTP_PROXY:
+ HTTPS_PROXY:
+ NO_PROXY:
+ volumes:
+ - './kvasir_data'
+
optional_plugin_components:
- cuda_device_monitor:
- template: openfl.plugins.processing_units_monitor.pynvml_monitor.PynvmlCUDADeviceMonitor
- settings: []
+ cuda_device_monitor:
+ template: openfl.plugins.processing_units_monitor.pynvml_monitor.PynvmlCUDADeviceMonitor
+ settings: [ ]
shard_descriptor:
template: kvasir_shard_descriptor.KvasirShardDescriptor
@@ -12,3 +23,5 @@ shard_descriptor:
data_folder: kvasir_data
rank_worldsize: 1,10
enforce_image_hw: '300,400'
+
+
diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config_no_gpu.yaml b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config_no_gpu.yaml
index 1c121e534a..ed99b0de46 100644
--- a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config_no_gpu.yaml
+++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/envoy_config_no_gpu.yaml
@@ -1,7 +1,11 @@
params:
- cuda_devices: []
+ cuda_devices: [ ]
+ docker_env:
+ http_proxy:
+ https_rpoxy:
+ no_proxy:
-optional_plugin_components: {}
+optional_plugin_components: { }
shard_descriptor:
template: kvasir_shard_descriptor.KvasirShardDescriptor
diff --git a/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/start_envoy_docker.sh b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/start_envoy_docker.sh
new file mode 100755
index 0000000000..e8249f1b24
--- /dev/null
+++ b/openfl-tutorials/interactive_api/PyTorch_Kvasir_UNet/envoy/start_envoy_docker.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+set -e
+
+fx envoy start -n env_one --disable-tls --envoy-config-path envoy_config.yaml -dh localhost -dp 50050 --use-docker
diff --git a/openfl-tutorials/interactive_api/numpy_linear_regression/envoy/linreg_shard_descriptor.py b/openfl-tutorials/interactive_api/numpy_linear_regression/envoy/linreg_shard_descriptor.py
index 12bfed9df4..6c47172618 100644
--- a/openfl-tutorials/interactive_api/numpy_linear_regression/envoy/linreg_shard_descriptor.py
+++ b/openfl-tutorials/interactive_api/numpy_linear_regression/envoy/linreg_shard_descriptor.py
@@ -33,7 +33,7 @@ def get_dataset(self, dataset_type: str) -> np.ndarray:
"""
Return a shard dataset by type.
- A simple list with elements (x, y) implemets the Shard Dataset interface.
+ A simple list with elements (x, y) implements the Shard Dataset interface.
"""
if dataset_type == 'train':
return self.data[:self.n_samples // 2]
diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py
index dd1fc591ea..c95515c659 100644
--- a/openfl/component/aggregator/aggregator.py
+++ b/openfl/component/aggregator/aggregator.py
@@ -4,7 +4,9 @@
"""Aggregator module."""
import queue
from logging import getLogger
+from pathlib import Path
from typing import List
+from typing import Union
from openfl.component.aggregation_functions import WeightedAverage
from openfl.databases import TensorDB
@@ -12,6 +14,7 @@
from openfl.pipelines import TensorCodec
from openfl.protocols import base_pb2
from openfl.protocols import utils
+from openfl.transport.grpc.director_client import DirectorClient
from openfl.utilities import TaskResultKey
from openfl.utilities import TensorKey
from openfl.utilities.logs import write_metric
@@ -33,24 +36,33 @@ class Aggregator:
\* - plan setting.
"""
- def __init__(self,
-
- aggregator_uuid,
- federation_uuid,
- authorized_cols,
-
- init_state_path,
- best_state_path,
- last_state_path,
-
- assigner,
-
- rounds_to_train=256,
- single_col_cert_common_name=None,
- compression_pipeline=None,
- db_store_rounds=1,
- write_logs=False,
- **kwargs):
+ def __init__(
+ self,
+
+ aggregator_uuid,
+ federation_uuid,
+ authorized_cols,
+
+ init_state_path,
+ best_state_path,
+ last_state_path,
+
+ assigner,
+
+ director_host,
+ director_port,
+ experiment_name: str = None,
+ rounds_to_train=256,
+ single_col_cert_common_name=None,
+ compression_pipeline=None,
+ db_store_rounds=1,
+ write_logs=False,
+ tls: bool = False,
+ root_certificate: Union[Path, str] = None,
+ private_key: Union[Path, str] = None,
+ certificate: Union[Path, str] = None,
+ **kwargs
+ ) -> None:
"""Initialize."""
self.round_number = 0
self.single_col_cert_common_name = single_col_cert_common_name
@@ -110,6 +122,17 @@ def __init__(self,
self.collaborator_tasks_results = {} # {TaskResultKey: list of TensorKeys}
self.collaborator_task_weight = {} # {TaskResultKey: data_size}
+ self.experiment_name = experiment_name
+
+ self.director_client = DirectorClient(
+ client_id=f'aggregator_{experiment_name}',
+ director_host=director_host,
+ director_port=director_port,
+ tls=tls,
+ root_certificate=root_certificate,
+ private_key=private_key,
+ certificate=certificate,
+ )
def _load_initial_tensors(self):
"""
@@ -517,7 +540,8 @@ def send_local_task_results(self, collaborator_name, round_number, task_name,
'task_name': task_name,
'metric_name': tensor_key.tensor_name,
'metric_value': metric_value,
- 'round': round_number}
+ 'round': round_number
+ }
if self.write_logs:
self.log_metric(tensor_key.tags[-1], task_name,
tensor_key.tensor_name, nparray, round_number)
@@ -607,10 +631,14 @@ def _process_named_tensor(self, named_tensor, collaborator_name):
The numpy array associated with the returned tensorkey
"""
raw_bytes = named_tensor.data_bytes
- metadata = [{'int_to_float': proto.int_to_float,
- 'int_list': proto.int_list,
- 'bool_list': proto.bool_list}
- for proto in named_tensor.transformer_metadata]
+ metadata = [
+ {
+ 'int_to_float': proto.int_to_float,
+ 'int_list': proto.int_list,
+ 'bool_list': proto.bool_list,
+ }
+ for proto in named_tensor.transformer_metadata
+ ]
# The tensor has already been transfered to aggregator,
# so the newly constructed tensor should have the aggregator origin
tensor_key = TensorKey(
@@ -793,7 +821,7 @@ def _prepare_trained(self, tensor_name, origin, round_number, report, agg_result
# Finally, cache the updated model tensor
self.tensor_db.cache_tensor({final_model_tk: new_model_nparray})
- def _compute_validation_related_task_metrics(self, task_name):
+ def _compute_validation_related_task_metrics(self, task_name) -> bool:
"""
Compute all validation related metrics.
@@ -801,6 +829,7 @@ def _compute_validation_related_task_metrics(self, task_name):
task_name : str
The task name to compute
"""
+ is_best_model = False
# By default, print out all of the metrics that the validation
# task sent
# This handles getting the subset of collaborators that may be
@@ -845,7 +874,8 @@ def _compute_validation_related_task_metrics(self, task_name):
'task_name': task_name,
'metric_name': tensor_key.tensor_name,
'metric_value': agg_results.item(),
- 'round': round_number}
+ 'round': round_number,
+ }
if agg_results is None:
self.logger.warning(
@@ -872,8 +902,25 @@ def _compute_validation_related_task_metrics(self, task_name):
f'model with score {agg_results:f}')
self.best_model_score = agg_results
self._save_model(round_number, self.best_state_path)
+ is_best_model = True
if 'trained' in tags:
self._prepare_trained(tensor_name, origin, round_number, report, agg_results)
+ return is_best_model
+
+ def upload_model_to_aggregator(self, model_type: str = 'last'):
+ if model_type not in ['last', 'best']:
+ raise ValueError(
+ f'Invalid {model_type=} for upload_model_to_aggregator function. '
+ f'Allowed values "last", "best"'
+ )
+ model_proto = utils.construct_model_proto(
+ self.last_tensor_dict, 0, NoCompressionPipeline()
+ )
+ self.director_client.upload_experiment_model(
+ experiment_name=self.experiment_name,
+ model_proto=model_proto,
+ model_type=model_type
+ )
def _end_of_round_check(self):
"""
@@ -894,8 +941,11 @@ def _end_of_round_check(self):
# Compute all validation related metrics
all_tasks = self.assigner.get_all_tasks_for_round(self.round_number)
+ is_best_model = False
for task_name in all_tasks:
- self._compute_validation_related_task_metrics(task_name)
+ _is_best_model = self._compute_validation_related_task_metrics(task_name)
+ if _is_best_model:
+ is_best_model = True
# Once all of the task results have been processed
self.round_number += 1
@@ -903,6 +953,8 @@ def _end_of_round_check(self):
# Save the latest model
self.logger.info(f'Saving round {self.round_number} model...')
self._save_model(self.round_number, self.last_state_path)
+ model_type = 'best' if is_best_model else 'last'
+ self.upload_model_to_aggregator(model_type=model_type)
# TODO This needs to be fixed!
if self._time_to_quit():
@@ -943,7 +995,9 @@ def _log_big_warning(self):
def stop(self, failed_collaborator: str = None) -> None:
"""Stop aggregator execution."""
self.logger.info('Force stopping the aggregator execution.')
- for collaborator_name in filter(lambda c: c != failed_collaborator, self.authorized_cols):
+
+ for collaborator_name in filter(lambda c: c != failed_collaborator,
+ self.authorized_cols):
self.logger.info(f'Sending signal to collaborator {collaborator_name} to shutdown...')
self.quit_job_sent_to.append(collaborator_name)
diff --git a/openfl/component/director/director.py b/openfl/component/director/director.py
index 8cd0689531..fadaacf328 100644
--- a/openfl/component/director/director.py
+++ b/openfl/component/director/director.py
@@ -1,17 +1,24 @@
-# Copyright (C) 2020-2021 Intel Corporation
+# Copyright (C) 2020-2022 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""Director module."""
import asyncio
import logging
+import pickle
+import shutil
import time
+import uuid
from collections import defaultdict
from pathlib import Path
+from tarfile import TarFile
from typing import Iterable
+from typing import List
from typing import Union
from typing import ValuesView
+from openfl.docker.docker import DockerConfig
+from openfl.federated import Plan
from openfl.protocols import base_pb2
from openfl.transport import AsyncAggregatorGRPCClient
from .experiment import Experiment
@@ -28,13 +35,16 @@ class Director:
def __init__(
self, *,
+ director_host,
+ director_port,
tls: bool = True,
root_certificate: Union[Path, str] = None,
private_key: Union[Path, str] = None,
certificate: Union[Path, str] = None,
sample_shape: list = None,
target_shape: list = None,
- settings: dict = None
+ settings: dict = None,
+ docker_config: DockerConfig,
) -> None:
"""Initialize a director object."""
self.sample_shape, self.target_shape = sample_shape, target_shape
@@ -47,6 +57,9 @@ def __init__(
self.settings = settings or {}
self.col_exp_queues = defaultdict(asyncio.Queue)
self.col_exp = {}
+ self.director_host = director_host
+ self.director_port = director_port
+ self.docker_config = docker_config
def acknowledge_shard(self, shard_info: dict) -> bool:
"""Save shard info to shard registry if it's acceptable."""
@@ -76,13 +89,34 @@ async def set_new_experiment(
experiment_archive_path: Path,
) -> bool:
"""Set new experiment."""
+ tensor_dict_path = Path('tmp') / f'{uuid.uuid4()}' / f'{experiment_name}.pickle'
+ tensor_dict_path = tensor_dict_path.absolute()
+ tensor_dict_path.parent.mkdir(parents=True, exist_ok=True)
+
+ with tensor_dict_path.open('wb') as f:
+ pickle.dump(tensor_dict, f)
+ plan = self._parse_plan(experiment_archive_path)
+ aggregator_client = AsyncAggregatorGRPCClient(
+ agg_addr=plan.agg_addr,
+ agg_port=plan.agg_port,
+ tls=self.tls,
+ disable_client_auth=not self.tls,
+ root_certificate=self.root_certificate,
+ certificate=self.certificate,
+ private_key=self.private_key
+ )
experiment = Experiment(
name=experiment_name,
archive_path=experiment_archive_path,
collaborators=list(collaborator_names),
users=[sender_name],
sender=sender_name,
- init_tensor_dict=tensor_dict,
+ init_tensor_dict_path=tensor_dict_path,
+ docker_config=self.docker_config,
+ director_host=self.director_host,
+ director_port=self.director_port,
+ plan=plan,
+ aggregator_client=aggregator_client,
)
self.experiments_registry.add(experiment)
return True
@@ -90,8 +124,6 @@ async def set_new_experiment(
async def get_aggregator_client(self, experiment_name):
"""Return an aggregator client for the experiment."""
exp = self.experiments_registry[experiment_name]
- while exp.status != Status.IN_PROGRESS:
- await asyncio.sleep(1)
agg_port = exp.plan.agg_port
agg_addr = exp.plan.agg_addr
logger.info(f'Aggregator uri: {agg_addr}:{agg_port}')
@@ -114,16 +146,16 @@ async def get_trained_model(self, experiment_name: str, caller: str, model_type:
logger.error('No experiment data in the stash')
return None
exp = self.experiments_registry[experiment_name]
- if exp.status != Status.IN_PROGRESS:
- return None
- aggregator_client = await self.get_aggregator_client(experiment_name)
- trained_model = await aggregator_client.get_trained_model(
- experiment_name,
- model_type
- )
-
- return trained_model
+ if model_type == 'last':
+ return exp.last_tensor_dict
+ elif model_type == 'best':
+ return exp.best_tensor_dict
+ else:
+ raise ValueError(
+ f'Invalid value {model_type=} in function get_trained_model. '
+ f'Allowed values: "last", "best"'
+ )
def get_experiment_data(self, experiment_name: str) -> Path:
"""Get experiment data."""
@@ -165,15 +197,14 @@ async def stream_metrics(self, experiment_name: str, caller: str):
Raises:
StopIteration - if the experiment is finished and there is no more metrics to report
"""
- if (experiment_name not in self.experiments_registry
- or caller not in self.experiments_registry[experiment_name].users):
+ exp = self.experiments_registry.get(experiment_name)
+ if exp is None or caller not in exp.users:
raise Exception(
f'No experiment name "{experiment_name}" in experiments list, or caller "{caller}"'
f' does not have access to this experiment'
)
- aggregator_client = await self.get_aggregator_client(experiment_name)
- async for metric_dict in aggregator_client.get_metric_stream():
+ async for metric_dict in await exp.get_metric_stream():
yield metric_dict
def remove_experiment_data(self, experiment_name: str, caller: str):
@@ -182,11 +213,11 @@ def remove_experiment_data(self, experiment_name: str, caller: str):
and caller in self.experiments_registry[experiment_name].users):
self.experiments_registry.remove(experiment_name)
- def set_experiment_failed(self, *, experiment_name: str, collaborator_name: str):
+ async def set_experiment_failed(self, *, experiment_name: str, collaborator_name: str):
"""Set experiment failed."""
- if experiment_name in self.experiments_registry:
- aggregator = self.experiments_registry[experiment_name].aggregator
- aggregator.stop(failed_collaborator=collaborator_name)
+ exp = self.experiments_registry.get(experiment_name)
+ if exp is not None:
+ await exp.stop(collaborator_name)
def update_envoy_status(
self, *,
@@ -215,36 +246,25 @@ def get_envoys(self) -> ValuesView:
"""Get a status information about envoys."""
logger.info(f'Shard registry: {self._shard_registry}')
for envoy_info in self._shard_registry.values():
- envoy_info['is_online'] = (
- time.time() < envoy_info.get('last_updated', 0)
- + envoy_info.get('valid_duration', 0)
- )
+ last_updated = envoy_info.get('last_updated', 0)
+ valid_duration = envoy_info.get('valid_duration', 0)
+ envoy_info['is_online'] = time.time() < last_updated + valid_duration
envoy_name = envoy_info['shard_info']['node_info']['name']
envoy_info['experiment_name'] = self.col_exp.get(envoy_name)
return self._shard_registry.values()
- async def get_experiments_list(self, caller: str) -> list:
+ async def get_experiments_list(self, caller: str) -> List[dict]:
"""Get experiments list for specific user."""
experiments = self.experiments_registry.get_user_experiments(caller)
result = []
for exp in experiments:
- exp_data = {
- 'name': exp.name,
- 'status': exp.status,
- 'collaborators_amount': len(exp.collaborators),
- }
- if exp.status == Status.IN_PROGRESS:
- aggregator_client = await self.get_aggregator_client(exp.name)
- experiment_pb2 = await aggregator_client.get_experiment_description()
- exp_data['progress'] = experiment_pb2.progress
- exp_data['tasks_amount'] = len(experiment_pb2.tasks)
- result.append(exp_data)
-
+ description = await exp.get_description()
+ result.append(description)
return result
async def get_experiment_description(self, caller: str, experiment_name: str) -> dict:
- """Get a experiment information by name for specific user."""
+ """Get an experiment information by name for specific user."""
exp = self.experiments_registry.get(experiment_name)
if not exp or caller not in exp.users:
logger.info(f'Experiment {experiment_name} for user {caller} does not exist.')
@@ -254,12 +274,8 @@ async def get_experiment_description(self, caller: str, experiment_name: str) ->
name=exp.name,
status=exp.status,
)
- aggregator_client = await self.get_aggregator_client(experiment_name)
- experiment_pb2 = await aggregator_client.get_experiment_description()
- experiment_pb2.name = experiment_name
- experiment_pb2.status = exp.status
-
- return experiment_pb2
+ description = await exp.get_description()
+ return description
async def start_experiment_execution_loop(self):
"""Run task to monitor and run experiments."""
@@ -276,3 +292,36 @@ async def start_experiment_execution_loop(self):
queue = self.col_exp_queues[col_name]
await queue.put(experiment.name)
await run_aggregator_future
+
+ async def upload_experiment_model(
+ self,
+ experiment_name: str,
+ tensor_dict: dict,
+ model_type: str,
+ ) -> None:
+ if model_type not in ['last', 'best']:
+ raise ValueError(
+ f'Invalid {model_type=} in upload_experiment_model function. '
+ f'Allowed values "last", "best"'
+ )
+ exp = self.experiments_registry[experiment_name]
+ if model_type == 'last':
+ exp.last_tensor_dict = tensor_dict
+ if model_type == 'best':
+ exp.best_tensor_dict = tensor_dict
+
+ @staticmethod
+ def _parse_plan(archive_path):
+ plan_path = Path('plan/plan.yaml')
+ with TarFile(name=archive_path, mode='r') as tar_file:
+ plan_buffer = tar_file.extractfile(f'./{plan_path}')
+ if plan_buffer is None:
+ raise Exception(f'No {plan_path} in workspace.')
+ plan_data = plan_buffer.read()
+ tmp_plan_path = Path('tmp') / f'{uuid.uuid4()}' / 'plan.yaml'
+ tmp_plan_path.parent.mkdir(parents=True, exist_ok=True)
+ with tmp_plan_path.open('wb') as plan_f:
+ plan_f.write(plan_data)
+ plan = Plan.parse(plan_config_path=tmp_plan_path)
+ shutil.rmtree(tmp_plan_path.parent, ignore_errors=True)
+ return plan
diff --git a/openfl/component/director/experiment.py b/openfl/component/director/experiment.py
index c3c6537a50..63b1969339 100644
--- a/openfl/component/director/experiment.py
+++ b/openfl/component/director/experiment.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2020-2021 Intel Corporation
+# Copyright (C) 2021-2022 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""Experiment module."""
@@ -7,12 +7,19 @@
import logging
from contextlib import asynccontextmanager
from pathlib import Path
+from tarfile import TarFile
from typing import Iterable
from typing import List
from typing import Union
+import numpy as np
+from google.protobuf.json_format import MessageToDict
+
+from openfl.docker import docker
+from openfl.docker.docker import DockerConfig
from openfl.federated import Plan
from openfl.transport import AggregatorGRPCServer
+from openfl.transport import AsyncAggregatorGRPCClient
from openfl.utilities.workspace import ExperimentWorkspace
logger = logging.getLogger(__name__)
@@ -25,6 +32,8 @@ class Status:
FINISHED = 'finished'
IN_PROGRESS = 'in_progress'
FAILED = 'failed'
+ DOCKER_BUILD_IN_PROGRESS = 'docker_build_in_progress'
+ DOCKER_CREATE_CONTAINER = 'docker_create_container'
class Experiment:
@@ -36,9 +45,14 @@ def __init__(
archive_path: Union[Path, str],
collaborators: List[str],
sender: str,
- init_tensor_dict: dict,
+ init_tensor_dict_path: Union[Path, str],
+ director_host: str,
+ director_port: str,
+ aggregator_client: AsyncAggregatorGRPCClient,
+ plan: Plan,
plan_path: Union[Path, str] = 'plan/plan.yaml',
users: Iterable[str] = None,
+ docker_config: DockerConfig,
) -> None:
"""Initialize an experiment object."""
self.name = name
@@ -47,14 +61,17 @@ def __init__(
self.archive_path = archive_path
self.collaborators = collaborators
self.sender = sender
- self.init_tensor_dict = init_tensor_dict
- if isinstance(plan_path, str):
- plan_path = Path(plan_path)
- self.plan_path = plan_path
- self.plan = None
+ self.init_tensor_dict_path = Path(init_tensor_dict_path)
+ self.plan_path = Path(plan_path)
+ self.plan = plan
self.users = set() if users is None else set(users)
self.status = Status.PENDING
- self.aggregator = None
+ self.aggregator_client = aggregator_client
+ self.director_host = director_host
+ self.director_port = director_port
+ self.last_tensor_dict = {}
+ self.best_tensor_dict = {}
+ self.docker_config = docker_config
async def start(
self, *,
@@ -62,43 +79,118 @@ async def start(
root_certificate: Union[Path, str] = None,
private_key: Union[Path, str] = None,
certificate: Union[Path, str] = None,
- ):
+ ) -> None:
"""Run experiment."""
- self.status = Status.IN_PROGRESS
try:
logger.info(f'New experiment {self.name} for '
f'collaborators {self.collaborators}')
- with ExperimentWorkspace(self.name, self.archive_path):
- aggregator_grpc_server = self._create_aggregator_grpc_server(
+ if self.docker_config.use_docker:
+ await self._run_aggregator_in_docker(
+ data_file_path=self.archive_path,
tls=tls,
root_certificate=root_certificate,
private_key=private_key,
certificate=certificate,
)
- self.aggregator = aggregator_grpc_server.aggregator
- await self._run_aggregator_grpc_server(
- aggregator_grpc_server=aggregator_grpc_server,
- )
+ else:
+ self.status = Status.IN_PROGRESS
+ with ExperimentWorkspace(self.name, self.archive_path):
+ aggregator_grpc_server = self._create_aggregator_grpc_server(
+ director_host=self.director_host,
+ director_port=self.director_port,
+ tls=tls,
+ root_certificate=root_certificate,
+ private_key=private_key,
+ certificate=certificate,
+ )
+ await self._run_aggregator_grpc_server(
+ aggregator_grpc_server=aggregator_grpc_server,
+ )
+
self.status = Status.FINISHED
logger.info(f'Experiment "{self.name}" was finished successfully.')
except Exception as e:
self.status = Status.FAILED
logger.exception(f'Experiment "{self.name}" was failed with error: {e}.')
+ async def stop(self, failed_collaborator: str = None):
+ await self.aggregator_client.stop(failed_collaborator=failed_collaborator)
+
+ async def get_description(self) -> dict:
+ description = {
+ 'name': self.name,
+ 'status': self.status,
+ 'collaborators_amount': len(self.collaborators),
+ }
+
+ if self.status == Status.IN_PROGRESS:
+ description_pb2 = await self.aggregator_client.get_experiment_description()
+ description = MessageToDict(description_pb2)
+
+ return description
+
+ async def get_metric_stream(self):
+ return self.aggregator_client.get_metric_stream()
+
+ async def _run_aggregator_in_docker(
+ self, *,
+ data_file_path: Path,
+ tls: bool = False,
+ root_certificate: Union[Path, str] = None,
+ private_key: Union[Path, str] = None,
+ certificate: Union[Path, str] = None,
+ ) -> None:
+ self.status = Status.DOCKER_BUILD_IN_PROGRESS
+ docker_client = docker.Docker(config=self.docker_config)
+ docker_context_path = docker.create_aggregator_context(
+ data_file_path=data_file_path,
+ init_tensor_dict_path=self.init_tensor_dict_path,
+ )
+ image_tag = await docker_client.build_image(
+ context_path=docker_context_path,
+ tag='aggregator',
+ )
+ cmd = (
+ f'python run.py '
+ f'--experiment_name {self.name} '
+ f'--director_host {self.director_host} '
+ f'--director_port {self.director_port} '
+ f'--init_tensor_dict_path init_tensor_dict.pickle '
+ f'--collaborators {" ".join(self.collaborators)} '
+ f'--root_certificate {root_certificate} '
+ f'--private_key {private_key} '
+ f'--certificate {certificate} '
+ f'{"--tls " if tls else "--no-tls "}'
+ )
+ self.status = Status.DOCKER_CREATE_CONTAINER
+ self.container = await docker_client.create_container(
+ name=f'{self.name.lower()}_aggregator',
+ image_tag=image_tag,
+ cmd=cmd,
+ )
+ self.status = Status.IN_PROGRESS
+ await docker_client.start_and_monitor_container(container=self.container)
+ await self.container.delete(force=True)
+
def _create_aggregator_grpc_server(
self, *,
+ director_host,
+ director_port,
tls: bool = True,
root_certificate: Union[Path, str] = None,
private_key: Union[Path, str] = None,
certificate: Union[Path, str] = None,
) -> AggregatorGRPCServer:
- self.plan = Plan.parse(plan_config_path=Path(self.plan_path))
self.plan.authorized_cols = list(self.collaborators)
logger.info('🧿 Starting the Aggregator Service.')
+ init_tensor_dict = np.load(str(self.init_tensor_dict_path), allow_pickle=True)
aggregator_grpc_server = self.plan.interactive_api_get_server(
- tensor_dict=self.init_tensor_dict,
+ experiment_name=self.name,
+ director_host=director_host,
+ director_port=director_port,
+ tensor_dict=init_tensor_dict,
root_certificate=root_certificate,
certificate=certificate,
private_key=private_key,
@@ -106,6 +198,19 @@ def _create_aggregator_grpc_server(
)
return aggregator_grpc_server
+ def _parse_plan(self):
+ with TarFile(name=self.archive_path, mode='r') as tar_file:
+ plan_buffer = tar_file.extractfile(f'./{self.plan_path}')
+ if plan_buffer is None:
+ raise Exception(f'No {self.plan_path} in workspace.')
+ plan_data = plan_buffer.read()
+ local_plan_path = Path(self.name) / self.plan_path
+ local_plan_path.parent.mkdir(parents=True, exist_ok=True)
+ with local_plan_path.open('wb') as plan_f:
+ plan_f.write(plan_data)
+ plan = Plan.parse(plan_config_path=local_plan_path)
+ return plan
+
@staticmethod
async def _run_aggregator_grpc_server(aggregator_grpc_server: AggregatorGRPCServer) -> None:
"""Run aggregator."""
@@ -135,6 +240,8 @@ def __init__(self) -> None:
self.__pending_experiments = []
self.__archived_experiments = []
self.__dict = {}
+ self.last_tensor_dict = {}
+ self.best_tensor_dict = {}
@property
def active_experiment(self) -> Union[Experiment, None]:
diff --git a/openfl/component/director/run_aggregator.py b/openfl/component/director/run_aggregator.py
new file mode 100644
index 0000000000..e5bd9f8780
--- /dev/null
+++ b/openfl/component/director/run_aggregator.py
@@ -0,0 +1,102 @@
+# Copyright (C) 2022 Intel Corporation
+# SPDX-License-Identifier: Apache-2.0
+
+"""Script to run aggregator in docker."""
+
+import argparse
+import asyncio
+import logging
+from pathlib import Path
+
+import numpy as np
+
+from openfl.federated import Plan
+from openfl.interface.cli import setup_logging
+
+logger = logging.getLogger(__name__)
+
+setup_logging()
+
+PLAN_PATH_DEFAULT = 'plan/plan.yaml'
+
+
+def _parse_args():
+ parser = argparse.ArgumentParser(description='Run aggregator.')
+ parser.add_argument('--plan_path', type=str, nargs='?', default=PLAN_PATH_DEFAULT)
+
+ parser.add_argument('--experiment_name', type=str)
+ parser.add_argument('--director_host', type=str)
+ parser.add_argument('--director_port', type=int)
+
+ parser.add_argument('--collaborators', type=str, nargs='+')
+ parser.add_argument('--init_tensor_dict_path', type=str)
+ parser.add_argument('--root_certificate', type=str, nargs='?', default=None)
+ parser.add_argument('--private_key', type=str, nargs='?', default=None)
+ parser.add_argument('--certificate', type=str, nargs='?', default=None)
+ parser.add_argument('--tls', dest='tls', action='store_true')
+ parser.add_argument('--no-tls', dest='tls', action='store_false')
+ parser.set_defaults(tls=True)
+
+ return parser.parse_args()
+
+
+async def main(
+ plan_path,
+ experiment_name,
+ director_host,
+ director_port,
+ collaborators,
+ init_tensor_dict_path,
+ root_certificate,
+ certificate,
+ private_key,
+ tls,
+):
+ """Run main function."""
+ plan = Plan.parse(plan_config_path=Path(plan_path))
+ plan.authorized_cols = list(collaborators)
+
+ logger.info('🧿 Starting the Aggregator Service.')
+ init_tensor_dict = np.load(init_tensor_dict_path, allow_pickle=True)
+ aggregator_grpc_server = plan.interactive_api_get_server(
+ experiment_name=experiment_name,
+ director_host=director_host,
+ director_port=director_port,
+ tensor_dict=init_tensor_dict,
+ root_certificate=root_certificate,
+ certificate=certificate,
+ private_key=private_key,
+ tls=tls,
+ )
+
+ logger.info('🧿 Starting the Aggregator Service.')
+ grpc_server = aggregator_grpc_server.get_server()
+ grpc_server.start()
+ logger.info('Starting Aggregator gRPC Server')
+
+ try:
+ while not aggregator_grpc_server.aggregator.all_quit_jobs_sent():
+ # Awaiting quit job sent to collaborators
+ await asyncio.sleep(10)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ grpc_server.stop(0)
+ # Temporary solution to free RAM used by TensorDB
+ aggregator_grpc_server.aggregator.tensor_db.clean_up(0)
+
+
+if __name__ == '__main__':
+ args = _parse_args()
+ asyncio.run(main(
+ plan_path=args.plan_path,
+ experiment_name=args.experiment_name,
+ director_host=args.director_host,
+ director_port=args.director_port,
+ collaborators=args.collaborators,
+ init_tensor_dict_path=args.init_tensor_dict_path,
+ root_certificate=args.root_certificate,
+ certificate=args.certificate,
+ private_key=args.private_key,
+ tls=args.tls,
+ ))
diff --git a/openfl/component/envoy/envoy.py b/openfl/component/envoy/envoy.py
index c363f5d046..4a79d5b1ea 100644
--- a/openfl/component/envoy/envoy.py
+++ b/openfl/component/envoy/envoy.py
@@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
"""Envoy module."""
-
+import asyncio
import logging
import time
import traceback
@@ -15,6 +15,8 @@
from click import echo
+from openfl.docker import docker
+from openfl.docker.docker import DockerConfig
from openfl.federated import Plan
from openfl.interface.interactive_api.shard_descriptor import ShardDescriptor
from openfl.plugins.processing_units_monitor.cuda_device_monitor import CUDADeviceMonitor
@@ -41,6 +43,8 @@ def __init__(
tls: bool = True,
cuda_devices: Union[tuple, list] = (),
cuda_device_monitor: Optional[Type[CUDADeviceMonitor]] = None,
+ shard_descriptor_config,
+ docker_config: DockerConfig,
) -> None:
"""Initialize a envoy object."""
self.name = shard_name
@@ -59,8 +63,8 @@ def __init__(
)
self.shard_descriptor = shard_descriptor
+ self.shard_descriptor_config = shard_descriptor_config
self.cuda_devices = tuple(cuda_devices)
-
# Optional plugins
self.cuda_device_monitor = cuda_device_monitor
@@ -68,8 +72,11 @@ def __init__(
self.running_experiments = {}
self.is_experiment_running = False
self._health_check_future = None
+ self.docker_config = docker_config
+ envoy_info = '\n'.join([f'{k}={v}' for k, v in self.__dict__.items()])
+ logger.info(f'Envoy Info: \n{envoy_info}')
- def run(self):
+ async def run(self):
"""Run of the envoy working cycle."""
while True:
try:
@@ -83,12 +90,19 @@ def run(self):
data_file_path = self._save_data_stream_to_file(data_stream)
self.is_experiment_running = True
try:
- with ExperimentWorkspace(
- self.name + '_' + experiment_name,
- data_file_path,
- is_install_requirements=True
- ):
- self._run_collaborator()
+ if self.docker_config.use_docker:
+ await self._run_collaborator_in_docker(
+ experiment_name=experiment_name.lower(),
+ data_file_path=data_file_path,
+ )
+ else:
+ with ExperimentWorkspace(
+ self.name + '_' + experiment_name,
+ data_file_path,
+ is_install_requirements=True
+ ):
+ self._run_collaborator()
+
except Exception as exc:
logger.exception(f'Collaborator failed with error: {exc}:')
self.director_client.set_experiment_failed(
@@ -163,6 +177,41 @@ def _run_collaborator(self, plan='plan/plan.yaml'):
col.set_available_devices(cuda=self.cuda_devices)
col.run()
+ async def _run_collaborator_in_docker(self, experiment_name: str, data_file_path: Path):
+ """Run the collaborator for the experiment running."""
+ docker_client = docker.Docker(config=self.docker_config)
+ docker_context_path = docker.create_collaborator_context(
+ data_file_path=data_file_path,
+ shard_descriptor_config=self.shard_descriptor_config,
+ )
+ image_tag = await docker_client.build_image(
+ context_path=docker_context_path,
+ tag=f'{self.name}_{experiment_name}',
+ )
+ cuda_devices = ','.join(map(str, self.cuda_devices))
+ gpu_allowed = bool(cuda_devices)
+
+ cmd = (
+ f'python run.py '
+ f'--name {self.name} '
+ f'--plan_path plan/plan.yaml '
+ f'--root_certificate {self.root_certificate} '
+ f'--private_key {self.private_key} '
+ f'--certificate {self.certificate} '
+ f'--shard_config shard_descriptor_config.yaml '
+ )
+ if gpu_allowed:
+ cmd += f'--cuda_devices {cuda_devices} '
+
+ container = await docker_client.create_container(
+ name=f'{experiment_name}_collaborator_{self.name}',
+ image_tag=image_tag,
+ cmd=cmd,
+ gpu_allowed=gpu_allowed,
+ )
+ await docker_client.start_and_monitor_container(container=container)
+ await container.delete(force=True)
+
def start(self):
"""Start the envoy."""
try:
@@ -176,7 +225,7 @@ def start(self):
# Shard accepted for participation in the federation
logger.info('Shard accepted')
self._health_check_future = self.executor.submit(self.send_health_check)
- self.run()
+ asyncio.run(self.run())
else:
# Shut down
logger.error('Report shard info was not accepted')
diff --git a/openfl/component/envoy/run_collaborator.py b/openfl/component/envoy/run_collaborator.py
new file mode 100644
index 0000000000..fb352e9c4f
--- /dev/null
+++ b/openfl/component/envoy/run_collaborator.py
@@ -0,0 +1,89 @@
+# Copyright (C) 2022 Intel Corporation
+# SPDX-License-Identifier: Apache-2.0
+
+"""Run collaborator in docker module."""
+import argparse
+import logging
+from importlib import import_module
+from pathlib import Path
+from typing import Optional
+from typing import Tuple
+
+import yaml
+from click import echo
+
+from openfl.federated import Plan
+from openfl.interface.cli import setup_logging
+
+logger = logging.getLogger(__name__)
+
+setup_logging()
+
+PLAN_PATH_DEFAULT = 'plan/plan.yaml'
+
+
+def _parse_args():
+ parser = argparse.ArgumentParser(description='Run collaborator.')
+ parser.add_argument('--name', type=str)
+ parser.add_argument('--plan_path', type=str, nargs='?', default=PLAN_PATH_DEFAULT)
+ parser.add_argument('--root_certificate', type=str, nargs='?', default=None)
+ parser.add_argument('--private_key', type=str, nargs='?', default=None)
+ parser.add_argument('--certificate', type=str, nargs='?', default=None)
+ parser.add_argument('--shard_config', type=str, nargs='?', default=None)
+ parser.add_argument('--cuda_devices', type=str, nargs='?', default='')
+ return parser.parse_args()
+
+
+def _run_collaborator(
+ plan_path: str,
+ name: str,
+ root_certificate: str,
+ private_key: str,
+ certificate: str,
+ shard_descriptor: str,
+ cuda_devices: Optional[Tuple[str]],
+) -> None:
+ plan = Plan.parse(plan_config_path=Path(plan_path))
+ echo(f'Data = {plan.cols_data_paths}')
+ logger.info('🧿 Starting a Collaborator Service.')
+
+ col = plan.get_collaborator(name, root_certificate, private_key,
+ certificate, shard_descriptor=shard_descriptor)
+ if cuda_devices is not None:
+ col.set_available_devices(cuda=cuda_devices)
+ col.run()
+
+
+def _shard_descriptor_from_config(shard_config: dict):
+ template = shard_config.get('template')
+ if not template:
+ raise Exception('You should define a shard '
+ 'descriptor template in the envoy config')
+ class_name = template.split('.')[-1]
+ module_path = '.'.join(template.split('.')[:-1])
+ params = shard_config.get('params', {})
+
+ module = import_module(module_path)
+ instance = getattr(module, class_name)(**params)
+
+ return instance
+
+
+if __name__ == '__main__':
+ args = _parse_args()
+ with open(args.shard_config) as f:
+ shard_descriptor_config = yaml.safe_load(f)
+ shard_descriptor = _shard_descriptor_from_config(shard_descriptor_config)
+ if args.cuda_devices != '':
+ cuda_devices = tuple(args.cuda_devices.split(','))
+ else:
+ cuda_devices = ()
+ _run_collaborator(
+ plan_path=args.plan_path,
+ name=args.name,
+ root_certificate=args.root_certificate,
+ private_key=args.private_key,
+ certificate=args.certificate,
+ shard_descriptor=shard_descriptor,
+ cuda_devices=cuda_devices,
+ )
diff --git a/openfl/docker/__init__.py b/openfl/docker/__init__.py
new file mode 100644
index 0000000000..8c7da1f42a
--- /dev/null
+++ b/openfl/docker/__init__.py
@@ -0,0 +1,4 @@
+# Copyright (C) 2022 Intel Corporation
+# SPDX-License-Identifier: Apache-2.0
+
+"""Docker package."""
diff --git a/openfl/docker/docker.py b/openfl/docker/docker.py
new file mode 100644
index 0000000000..0b9dcf2dbc
--- /dev/null
+++ b/openfl/docker/docker.py
@@ -0,0 +1,196 @@
+# Copyright (C) 2022 Intel Corporation
+# SPDX-License-Identifier: Apache-2.0
+
+"""Docker module."""
+
+from dataclasses import dataclass
+import logging
+import os
+from io import BytesIO
+from pathlib import Path
+from tarfile import TarFile
+from typing import Dict
+from typing import List
+from typing import Optional
+from dataclasses import field
+
+import aiodocker
+import yaml
+from aiodocker.containers import DockerContainer
+
+logger = logging.getLogger(__name__)
+
+OPENFL_ROOT_PATH = Path(__file__).parent.parent.parent.absolute()
+
+
+@dataclass
+class DockerConfig:
+ use_docker: bool = True
+ env: Dict[str, str] = field(default_factory=dict)
+ buildargs: Dict[str, str] = field(default_factory=dict)
+ volumes: List[str] = field(default_factory=list)
+
+ @property
+ def binds(self) -> List[str]:
+ """Convert docker volumes to binds."""
+ binds = []
+ for volume in map(lambda x: x.split(':'), self.volumes):
+ target = volume[0]
+ bind = volume[0] if len(volume) == 1 else volume[1]
+
+ if bind.startswith('./'):
+ bind = bind.replace('.', '/code', 1)
+ elif bind.startswith('~'):
+ bind = bind.replace('~', '/root', 1)
+
+ if target.startswith('~'):
+ target = str(Path(target).expanduser().resolve())
+ if target.startswith('./'):
+ target = str(Path(target).expanduser().resolve())
+ binds.append(f'{target}:{bind}')
+
+ return binds
+
+ @property
+ def env_list(self):
+ env_lst = []
+ for k, v in self.env.items():
+ if v is None:
+ continue
+ env_lst.append(f'{k}={v}')
+ return env_lst
+
+
+class Docker:
+ """Docker class."""
+
+ def __init__(
+ self, *,
+ config: Optional[DockerConfig] = None,
+ ):
+ """Initialize an docker object."""
+ self.docker = aiodocker.Docker()
+ if config is None:
+ config = DockerConfig()
+ self.config = config
+
+ async def build_image(
+ self, *,
+ context_path: Path,
+ tag: str,
+ ) -> str:
+ """Build docker image."""
+ with open(context_path, 'rb') as f:
+ fileobj = BytesIO(f.read())
+ build_image_iter = self.docker.images.build(
+ fileobj=fileobj,
+ encoding='gzip',
+ tag=tag,
+ stream=True,
+ buildargs=self.config.buildargs,
+ )
+ async for message in build_image_iter:
+ if 'stream' not in message or len(message) > 1:
+ print(message)
+ logger.info(f'DOCKER BUILD {message.get("stream")}')
+ return f'{tag}:latest'
+
+ async def create_container(
+ self, *,
+ name: str,
+ image_tag: str,
+ cmd: str,
+ gpu_allowed: bool = False,
+ ) -> DockerContainer:
+ """Create docker container."""
+ config = {
+ 'Cmd': ['/bin/bash', '-c', cmd],
+ 'Env': self.config.env_list,
+ 'Image': image_tag,
+ 'HostConfig': {
+ 'NetworkMode': 'host',
+ 'Binds': self.config.binds,
+ 'ShmSize': 30 * 1024 * 1024 * 1024,
+ },
+ }
+ if gpu_allowed:
+ config['HostConfig'].update(**{
+ 'DeviceRequests': [{
+ 'Driver': 'nvidia',
+ 'Count': -1,
+ 'Capabilities': [['gpu', 'compute', 'utility']],
+ }],
+ })
+
+ logger.info(f'{config=}')
+
+ return await self.docker.containers.create_or_replace(
+ config=config,
+ name=name,
+ )
+
+ async def start_and_monitor_container(
+ self, *,
+ container: DockerContainer,
+ ) -> None:
+ """Start and monitor docker container."""
+ subscriber = self.docker.events.subscribe()
+ await container.start()
+ logs_stream = container.log(stdout=True, stderr=True, follow=True)
+ async for log in logs_stream:
+ logger.info(f'CONTAINER {log}')
+ while True:
+ event = await subscriber.get()
+ if event is None:
+ break
+
+ action = event.get('Action')
+
+ if event['Actor']['ID'] == container._id:
+ if action == 'stop':
+ logger.info(f'=> deleted {container._id[:12]}')
+ break
+ elif action == 'destroy':
+ logger.info('=> done with this container!')
+ break
+ elif action == 'die':
+ logger.info('=> container is died')
+ break
+
+
+def create_aggregator_context(data_file_path: Path, init_tensor_dict_path: Path):
+ """Create context for aggregator service."""
+ with TarFile(name=data_file_path, mode='a') as tar_file:
+ docker_file_path = OPENFL_ROOT_PATH / 'openfl-docker' / 'Dockerfile.aggregator'
+ run_aggregator_path = (OPENFL_ROOT_PATH / 'openfl' / 'component' / 'director'
+ / 'run_aggregator.py')
+ tar_file.add(docker_file_path, 'Dockerfile')
+ tar_file.add(run_aggregator_path, 'run.py')
+ tar_file.add(init_tensor_dict_path, 'init_tensor_dict.pickle')
+ return data_file_path
+
+
+def create_collaborator_context(
+ data_file_path: Path,
+ shard_descriptor_config
+) -> Path:
+ """Create context for collaborator service."""
+ with TarFile(name=data_file_path, mode='a') as tar_file:
+ docker_file_path = OPENFL_ROOT_PATH / 'openfl-docker' / 'Dockerfile.collaborator'
+ run_collaborator_path = (OPENFL_ROOT_PATH / 'openfl' / 'component' / 'envoy'
+ / 'run_collaborator.py')
+
+ with open('shard_descriptor_config.yaml', 'w') as f:
+ yaml.dump(shard_descriptor_config, f)
+ tar_file.add('shard_descriptor_config.yaml', 'shard_descriptor_config.yaml')
+ os.remove('shard_descriptor_config.yaml')
+
+ tar_file.add(docker_file_path, 'Dockerfile')
+ tar_file.add(run_collaborator_path, 'run.py')
+
+ template = shard_descriptor_config['template']
+ module_path = template.split('.')[:-1]
+ module_path[-1] = f'{module_path[-1]}.py'
+ shar_descriptor_path = str(Path.joinpath(Path('.'), *module_path))
+ tar_file.add(shar_descriptor_path, shar_descriptor_path)
+ return data_file_path
diff --git a/openfl/federated/plan/plan.py b/openfl/federated/plan/plan.py
index c4acceab02..b9be0cb463 100644
--- a/openfl/federated/plan/plan.py
+++ b/openfl/federated/plan/plan.py
@@ -315,7 +315,17 @@ def get_tasks(self):
tasks[task]['aggregation_type'] = aggregation_type
return tasks
- def get_aggregator(self, tensor_dict=None):
+ def get_aggregator(
+ self,
+ director_host,
+ director_port,
+ experiment_name: str = None,
+ tensor_dict=None,
+ tls=False,
+ root_certificate=None,
+ certificate=None,
+ private_key=None,
+ ):
"""Get federation aggregator."""
defaults = self.config.get('aggregator',
{
@@ -328,6 +338,13 @@ def get_aggregator(self, tensor_dict=None):
defaults[SETTINGS]['authorized_cols'] = self.authorized_cols
defaults[SETTINGS]['assigner'] = self.get_assigner()
defaults[SETTINGS]['compression_pipeline'] = self.get_tensor_pipe()
+ defaults[SETTINGS]['root_certificate'] = root_certificate
+ defaults[SETTINGS]['certificate'] = certificate
+ defaults[SETTINGS]['private_key'] = private_key
+ defaults[SETTINGS]['tls'] = tls
+ defaults[SETTINGS]['director_host'] = director_host
+ defaults[SETTINGS]['director_port'] = director_port
+ defaults[SETTINGS]['experiment_name'] = experiment_name
log_metric_callback = defaults[SETTINGS].get('log_metric_callback')
if log_metric_callback:
@@ -508,7 +525,8 @@ def get_client(self, collaborator_name, aggregator_uuid, federation_uuid,
return self.client_
- def get_server(self, root_certificate=None, private_key=None, certificate=None, **kwargs):
+ def get_server(self, director_host, director_port, root_certificate=None, private_key=None,
+ certificate=None, **kwargs):
"""Get gRPC server of the aggregator instance."""
common_name = self.config['network'][SETTINGS]['agg_addr'].lower()
@@ -526,15 +544,30 @@ def get_server(self, root_certificate=None, private_key=None, certificate=None,
server_args['certificate'] = certificate
server_args['private_key'] = private_key
- server_args['aggregator'] = self.get_aggregator()
+ server_args['aggregator'] = self.get_aggregator(
+ director_host=director_host,
+ director_port=director_port,
+ root_certificate=root_certificate,
+ private_key=private_key,
+ certificate=certificate,
+ )
if self.server_ is None:
self.server_ = AggregatorGRPCServer(**server_args)
return self.server_
- def interactive_api_get_server(self, *, tensor_dict, root_certificate, certificate,
- private_key, tls):
+ def interactive_api_get_server(
+ self, *,
+ experiment_name,
+ director_host,
+ director_port,
+ tensor_dict,
+ root_certificate,
+ certificate,
+ private_key,
+ tls,
+ ):
"""Get gRPC server of the aggregator instance."""
server_args = self.config['network'][SETTINGS]
@@ -544,7 +577,16 @@ def interactive_api_get_server(self, *, tensor_dict, root_certificate, certifica
server_args['private_key'] = private_key
server_args['tls'] = tls
- server_args['aggregator'] = self.get_aggregator(tensor_dict)
+ server_args['aggregator'] = self.get_aggregator(
+ experiment_name=experiment_name,
+ director_host=director_host,
+ director_port=director_port,
+ tensor_dict=tensor_dict,
+ root_certificate=root_certificate,
+ certificate=certificate,
+ private_key=private_key,
+ tls=tls,
+ )
if self.server_ is None:
self.server_ = AggregatorGRPCServer(**server_args)
diff --git a/openfl/interface/director.py b/openfl/interface/director.py
index b07468b376..23433e92b2 100644
--- a/openfl/interface/director.py
+++ b/openfl/interface/director.py
@@ -15,6 +15,7 @@
from dynaconf import Validator
from openfl.component.director import Director
+from openfl.docker.docker import DockerConfig
from openfl.interface.cli_helper import WORKSPACE
from openfl.transport import DirectorGRPCServer
from openfl.utilities import merge_configs
@@ -44,7 +45,9 @@ def director(context):
@option('-oc', '--public-cert-path', 'certificate', required=False,
type=ClickPath(exists=True), default=None,
help='Path to a signed certificate')
-def start(director_config_path, tls, root_certificate, private_key, certificate):
+@option('--use-docker/--no-use-docker', default=False, is_flag=True,
+ help='Use docker to run aggregator.')
+def start(director_config_path, tls, root_certificate, private_key, certificate, use_docker):
"""Start the director service."""
director_config_path = Path(director_config_path).absolute()
logger.info('🧿 Starting the Director Service.')
@@ -85,6 +88,16 @@ def start(director_config_path, tls, root_certificate, private_key, certificate)
if config.certificate:
config.certificate = Path(config.certificate).absolute()
+ docker_config = config.as_dict().get('SETTINGS', {}).pop('docker', {})
+ docker_env = docker_config.get('env', {})
+ docker_buildargs = docker_config.get('buildargs', {})
+ docker_volumes = docker_config.get('volumes', {})
+ docker_config = DockerConfig(
+ use_docker=use_docker,
+ env=docker_env,
+ buildargs=docker_buildargs,
+ volumes=docker_volumes,
+ )
director_server = DirectorGRPCServer(
director_cls=Director,
tls=tls,
@@ -96,6 +109,7 @@ def start(director_config_path, tls, root_certificate, private_key, certificate)
settings=config.settings,
listen_host=config.settings.listen_host,
listen_port=config.settings.listen_port,
+ docker_config=docker_config,
)
director_server.start()
diff --git a/openfl/interface/envoy.py b/openfl/interface/envoy.py
index 8d6a2564ce..dcfc695d27 100644
--- a/openfl/interface/envoy.py
+++ b/openfl/interface/envoy.py
@@ -16,6 +16,7 @@
from dynaconf import Validator
from openfl.component.envoy.envoy import Envoy
+from openfl.docker.docker import DockerConfig
from openfl.interface.cli_helper import WORKSPACE
from openfl.utilities import click_types
from openfl.utilities import merge_configs
@@ -48,8 +49,10 @@ def envoy(context):
help='Path to a private key', type=ClickPath(exists=True))
@option('-oc', '--public-cert-path', 'certificate', default=None,
help='Path to a signed certificate', type=ClickPath(exists=True))
+@option('--use-docker/--no-use-docker', default=False, is_flag=True,
+ help='Use docker to run collaborator.')
def start_(shard_name, director_host, director_port, tls, envoy_config_path,
- root_certificate, private_key, certificate):
+ root_certificate, private_key, certificate, use_docker):
"""Start the Envoy."""
logger.info('🧿 Starting the Envoy.')
if is_directory_traversal(envoy_config_path):
@@ -77,7 +80,7 @@ def start_(shard_name, director_host, director_port, tls, envoy_config_path,
config.certificate = Path(config.certificate).absolute()
# Parse envoy parameters
- envoy_params = config.get('params', {})
+ envoy_params = config.as_dict().get('PARAMS', {})
# Build optional plugin components
optional_plugins_section = config.get('optional_plugin_components')
@@ -93,18 +96,30 @@ def start_(shard_name, director_host, director_port, tls, envoy_config_path,
module = import_module(module_path)
instance = getattr(module, class_name)(**plugin_params)
envoy_params[plugin_name] = instance
-
# Instantiate Shard Descriptor
- shard_descriptor = shard_descriptor_from_config(config.get('shard_descriptor', {}))
+ shard_descriptor_config = config.as_dict().get('SHARD_DESCRIPTOR', {})
+ shard_descriptor = shard_descriptor_from_config(shard_descriptor_config)
+ docker_config = envoy_params.pop('docker', {})
+ docker_env = docker_config.get('env', {})
+ docker_buildargs = docker_config.get('buildargs', {})
+ docker_volumes = docker_config.get('volumes', [])
+ docker_config = DockerConfig(
+ use_docker=use_docker,
+ env=docker_env,
+ buildargs=docker_buildargs,
+ volumes=docker_volumes,
+ )
envoy = Envoy(
shard_name=shard_name,
director_host=director_host,
director_port=director_port,
tls=tls,
shard_descriptor=shard_descriptor,
+ shard_descriptor_config=shard_descriptor_config,
root_certificate=config.root_certificate,
private_key=config.private_key,
certificate=config.certificate,
+ docker_config=docker_config,
**envoy_params
)
diff --git a/openfl/interface/interactive_api/experiment.py b/openfl/interface/interactive_api/experiment.py
index 73f02f8ef8..bef763590b 100644
--- a/openfl/interface/interactive_api/experiment.py
+++ b/openfl/interface/interactive_api/experiment.py
@@ -286,7 +286,7 @@ def _pack_the_workspace():
from os import makedirs
from os.path import basename
- archive_type = 'zip'
+ archive_type = 'tar'
archive_name = basename(getcwd())
tmp_dir = 'temp_' + archive_name
diff --git a/openfl/protocols/aggregator.proto b/openfl/protocols/aggregator.proto
index 9d995dbc6b..d04f2c45d3 100644
--- a/openfl/protocols/aggregator.proto
+++ b/openfl/protocols/aggregator.proto
@@ -15,7 +15,8 @@ service Aggregator {
rpc GetMetricStream(GetMetricStreamRequest) returns (stream GetMetricStreamResponse) {}
rpc GetTrainedModel(GetTrainedModelRequest) returns (TrainedModelResponse) {}
rpc GetExperimentDescription(GetExperimentDescriptionRequest)
- returns (GetExperimentDescriptionResponse) {}
+ returns (GetExperimentDescriptionResponse) {}
+ rpc Stop(StopRequest) returns (StopResponse) {}
}
message MessageHeader {
@@ -50,7 +51,7 @@ message GetAggregatedTensorRequest {
string tensor_name = 2;
int32 round_number = 3;
bool report = 4;
- repeated string tags = 5;
+ repeated string tags = 5;
bool require_lossless = 6;
}
@@ -66,7 +67,7 @@ message TaskResults {
MessageHeader header = 1;
int32 round_number = 2;
string task_name = 3;
- int32 data_size = 4;
+ int32 data_size = 4;
repeated NamedTensor tensors = 5;
}
@@ -108,3 +109,9 @@ message GetExperimentDescriptionRequest {}
message GetExperimentDescriptionResponse {
ExperimentDescription experiment = 1;
}
+
+message StopRequest {
+ string failed_collaborator = 1;
+}
+
+message StopResponse {}
diff --git a/openfl/protocols/director.proto b/openfl/protocols/director.proto
index 6e6922db1c..69d8d5b874 100644
--- a/openfl/protocols/director.proto
+++ b/openfl/protocols/director.proto
@@ -10,7 +10,8 @@ import "google/protobuf/duration.proto";
import "openfl/protocols/base.proto";
service Director {
- // 1. Envoy RPCs
+ // 1. Services RPCs
+ // 1.1 Envoy RPCs
rpc UpdateShardInfo(UpdateShardInfoRequest) returns (UpdateShardInfoResponse) {}
// Shard owner could also provide some public data for tests
rpc WaitExperiment(stream WaitExperimentRequest) returns (stream WaitExperimentResponse) {}
@@ -18,10 +19,14 @@ service Director {
rpc UpdateEnvoyStatus(UpdateEnvoyStatusRequest) returns (UpdateEnvoyStatusResponse) {}
rpc SetExperimentFailed (SetExperimentFailedRequest) returns (SetExperimentFailedResponse) {}
+ // 1.2 Aggregator RPCs
+ rpc UploadExperimentModel(UploadExperimentModelRequest)
+ returns (UploadExperimentModelResponse) {}
+
// 2. Frontend RPCs
// 2.1 Extension RPCs
rpc GetExperimentDescription(GetExperimentDescriptionRequest)
- returns (GetExperimentDescriptionResponse) {}
+ returns (GetExperimentDescriptionResponse) {}
rpc GetExperimentsList(GetExperimentsListRequest) returns (GetExperimentsListResponse){}
// 2.2 API RPCs
@@ -43,7 +48,7 @@ message CudaDeviceInfo {
string name = 7;
}
-// Envoy Messages
+// Services. Envoy Messages
message NodeInfo {
string name = 1;
@@ -104,6 +109,20 @@ message SetExperimentFailedRequest {
message SetExperimentFailedResponse {}
+// Services. Aggregator messages.
+
+message UploadExperimentModelRequest {
+ enum ModelType {
+ LAST_BEST_MODEL = 0;
+ LAST_MODEL = 1;
+ }
+ ModelProto model_proto = 1;
+ ModelType model_type = 2;
+ string experiment_name = 3;
+}
+
+message UploadExperimentModelResponse {}
+
// Frontend. Extension Messages.
message GetExperimentsListRequest {}
diff --git a/openfl/transport/grpc/aggregator_client.py b/openfl/transport/grpc/aggregator_client.py
index 61e19ad48e..0d5ae603ae 100644
--- a/openfl/transport/grpc/aggregator_client.py
+++ b/openfl/transport/grpc/aggregator_client.py
@@ -10,6 +10,7 @@
from typing import Tuple
import grpc
+from google.protobuf.json_format import MessageToDict
from openfl.pipelines import NoCompressionPipeline
from openfl.protocols import aggregator_pb2
@@ -536,4 +537,9 @@ async def get_experiment_description(self):
"""Get experiment info."""
request = aggregator_pb2.GetExperimentDescriptionRequest()
response = await self.stub.GetExperimentDescription(request)
- return response.experiment
+ return MessageToDict(response.experiment)
+
+ async def stop(self, failed_collaborator: str = None):
+ request = aggregator_pb2.StopRequest(failed_collaborator=failed_collaborator)
+ response = await self.stub.Stop(request)
+ return response
diff --git a/openfl/transport/grpc/aggregator_server.py b/openfl/transport/grpc/aggregator_server.py
index b0bf81834e..1c6e7b3609 100644
--- a/openfl/transport/grpc/aggregator_server.py
+++ b/openfl/transport/grpc/aggregator_server.py
@@ -333,6 +333,9 @@ def GetExperimentDescription(self, request, context): # NOQA:N802
),
)
+ def Stop(self, request, context): # NOQA:N802
+ self.aggregator.stop(failed_collaborator=request.failed_collaborator)
+
def get_server(self):
"""Return gRPC server."""
self.server = server(ThreadPoolExecutor(max_workers=cpu_count() + 1),
diff --git a/openfl/transport/grpc/director_client.py b/openfl/transport/grpc/director_client.py
index 5a873a129a..912cc113f7 100644
--- a/openfl/transport/grpc/director_client.py
+++ b/openfl/transport/grpc/director_client.py
@@ -309,3 +309,22 @@ def get_experiment_description(self, name):
director_pb2.GetExperimentDescriptionRequest(name=name)
)
return response.experiment
+
+ def upload_experiment_model(self, experiment_name: str, model_proto, model_type: str = 'last'):
+ if model_type == 'best':
+ model_type = director_pb2.UploadExperimentModelRequest.LAST_BEST_MODEL
+ elif model_type == 'last':
+ model_type = director_pb2.UploadExperimentModelRequest.LAST_MODEL
+ else:
+ raise ValueError(
+ f'Invalid {model_type=} for upload_experiment_model function. '
+ f'Allowed values: "last", "best"'
+ )
+ response = self.stub.UploadExperimentModel(
+ director_pb2.UploadExperimentModelRequest(
+ model_proto=model_proto,
+ model_type=model_type,
+ experiment_name=experiment_name,
+ )
+ )
+ return response
diff --git a/openfl/transport/grpc/director_server.py b/openfl/transport/grpc/director_server.py
index 6c2d5432c7..9040faadc7 100644
--- a/openfl/transport/grpc/director_server.py
+++ b/openfl/transport/grpc/director_server.py
@@ -15,9 +15,11 @@
from grpc import aio
from grpc import ssl_server_credentials
+from openfl.docker.docker import DockerConfig
from openfl.pipelines import NoCompressionPipeline
from openfl.protocols import director_pb2
from openfl.protocols import director_pb2_grpc
+from openfl.protocols import utils
from openfl.protocols.utils import construct_model_proto
from openfl.protocols.utils import deconstruct_model_proto
from openfl.protocols.utils import get_headers
@@ -39,6 +41,7 @@ def __init__(
certificate: Optional[Union[Path, str]] = None,
listen_host: str = '[::]',
listen_port: int = 50051,
+ docker_config: DockerConfig,
**kwargs,
) -> None:
"""Initialize a director object."""
@@ -58,6 +61,9 @@ def __init__(
root_certificate=self.root_certificate,
private_key=self.private_key,
certificate=self.certificate,
+ director_host=listen_host,
+ director_port=listen_port,
+ docker_config=docker_config,
**kwargs
)
@@ -160,11 +166,18 @@ async def GetTrainedModel(self, request, context): # NOQA:N802
logger.info('Request GetTrainedModel has got!')
caller = self.get_caller(context)
+ model_type_enum = director_pb2.UploadExperimentModelRequest.ModelType
+ if request.model_type == model_type_enum.LAST_MODEL:
+ model_type = 'last'
+ elif request.model_type == model_type_enum.LAST_BEST_MODEL:
+ model_type = 'best'
+ else:
+ raise Exception(f'Invalid {request.model_type=} in GetTrainedModel function.')
trained_model_dict = await self.director.get_trained_model(
experiment_name=request.experiment_name,
caller=caller,
- model_type=request.model_type
+ model_type=model_type
)
if trained_model_dict is None:
@@ -243,7 +256,7 @@ async def SetExperimentFailed(self, request, context): # NOQA:N802
logger.error(f'Collaborator {request.collaborator_name} was failed with error code:'
f' {request.error_code}, error_description: {request.error_description}'
f'Stopping experiment.')
- self.director.set_experiment_failed(
+ await self.director.set_experiment_failed(
experiment_name=request.experiment_name,
collaborator_name=request.collaborator_name
)
@@ -303,3 +316,21 @@ async def GetExperimentDescription(self, request, context): # NOQA:N802
experiment = await self.director.get_experiment_description(caller, request.name)
return director_pb2.GetExperimentDescriptionResponse(experiment=experiment)
+
+ async def UploadExperimentModel(self, request, context): # NOQA:N802
+ """Upload an experiment model from aggregator."""
+ model_type_enum = director_pb2.UploadExperimentModelRequest.ModelType
+ if request.model_type == model_type_enum.LAST_MODEL:
+ model_type = 'last'
+ elif request.model_type == model_type_enum.LAST_BEST_MODEL:
+ model_type = 'best'
+ else:
+ raise Exception('Invalid model_type value in UploadExperimentModel function.')
+ tensor_dict, _ = utils.deconstruct_model_proto(
+ request.model_proto, compression_pipeline=NoCompressionPipeline())
+ await self.director.upload_experiment_model(
+ experiment_name=request.experiment_name,
+ tensor_dict=tensor_dict,
+ model_type=model_type,
+ )
+ return director_pb2.UploadExperimentModelResponse()
diff --git a/openfl/utilities/workspace.py b/openfl/utilities/workspace.py
index 08847f76aa..5c367ab6d0 100644
--- a/openfl/utilities/workspace.py
+++ b/openfl/utilities/workspace.py
@@ -61,7 +61,7 @@ def __enter__(self):
shutil.rmtree(self.experiment_work_dir)
os.makedirs(self.experiment_work_dir)
- shutil.unpack_archive(self.data_file_path, self.experiment_work_dir, format='zip')
+ shutil.unpack_archive(self.data_file_path, self.experiment_work_dir, format='tar')
if self.is_install_requirements:
self._install_requirements()
@@ -71,13 +71,15 @@ def __enter__(self):
# This is needed for python module finder
sys.path.append(self.experiment_work_dir)
+ return self.experiment_work_dir
+
def __exit__(self, exc_type, exc_value, traceback):
"""Remove the workspace."""
os.chdir(self.cwd)
shutil.rmtree(self.experiment_name, ignore_errors=True)
if self.experiment_work_dir in sys.path:
sys.path.remove(self.experiment_work_dir)
- os.remove(self.data_file_path)
+ self.data_file_path.unlink(missing_ok=True)
def dump_requirements_file(
@@ -93,7 +95,7 @@ def dump_requirements_file(
if prefixes is None:
prefixes = set()
elif type(prefixes) is str:
- prefixes = set(prefixes,)
+ prefixes = set(prefixes, )
else:
prefixes = set(prefixes)
diff --git a/setup.py b/setup.py
index 1e79a0b422..1a07112b5f 100644
--- a/setup.py
+++ b/setup.py
@@ -84,6 +84,7 @@ def run(self):
'openfl.component.envoy',
'openfl.cryptography',
'openfl.databases',
+ 'openfl.docker',
'openfl.federated',
'openfl.federated.data',
'openfl.federated.plan',
@@ -113,6 +114,7 @@ def run(self):
],
include_package_data=True,
install_requires=[
+ 'aiodocker==0.21.0',
'Click==8.0.1',
'PyYAML>=5.4.1',
'cloudpickle',
@@ -127,7 +129,7 @@ def run(self):
'pandas',
'protobuf',
'requests',
- 'rich==9.1.0',
+ 'rich==12.0.1',
'scikit-learn',
'tensorboard',
'tensorboardX',