Skip to content

Commit

Permalink
Resolved flake8 issues,
Browse files Browse the repository at this point in the history
Added warning statements when data.yaml is not found or aggregator / collaborator private attributes are not found in data.yaml,
Tested by running all tutorials and tests

Signed-off-by: Parth Mandaliya <parthx.mandaliya@intel.com>
  • Loading branch information
ParthM-GitHub committed Sep 28, 2023
1 parent a2ceb05 commit de2e333
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 173 deletions.
20 changes: 11 additions & 9 deletions openfl/experimental/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import pickle
import inspect
from threading import Event
from copy import deepcopy
from logging import getLogger
from typing import Any, Callable
from typing import Dict, List, Tuple
Expand All @@ -29,7 +28,8 @@ class Aggregator:
flow (Any): Flow class.
rounds_to_train (int): External loop rounds.
checkpoint (bool): Whether to save checkpoint or noe (default=False).
private_attrs_callable (Callable): Function for Aggregator private attriubtes (default=None).
private_attrs_callable (Callable): Function for Aggregator private attriubtes
(default=None).
private_attrs_kwargs (Dict): Arguments to call private_attrs_callable (default={}).
Returns:
Expand Down Expand Up @@ -87,7 +87,7 @@ def __init__(
self.flow = flow
self.checkpoint = checkpoint
self.flow._foreach_methods = []
self.logger.info(f"MetaflowInterface creation.")
self.logger.info("MetaflowInterface creation.")
self.flow._metaflow_interface = MetaflowInterface(
self.flow.__class__, "single_process"
)
Expand All @@ -107,7 +107,7 @@ def __init__(

def __initialize_private_attributes(self, kwargs: Dict) -> None:
"""
Call private_attrs_callable function set
Call private_attrs_callable function set
attributes to self.__private_attrs.
"""
self.__private_attrs = self.__private_attrs_callable(
Expand Down Expand Up @@ -182,8 +182,9 @@ def run_flow(self) -> None:

while not self.collaborator_task_results.is_set():
# Waiting for selected collaborators to send the results.
self.logger.info(f"Waiting for "
+ f"{self.collaborators_counter}/{len(self.selected_collaborators)}"
len_sel_collabs = len(self.selected_collaborators)
self.logger.info("Waiting for "
+ f"{self.collaborators_counter}/{len_sel_collabs}"
+ " collaborators to send results.")
time.sleep(Aggregator._get_sleep_time())

Expand All @@ -198,7 +199,8 @@ def call_checkpoint(self, ctx: Any, f: Callable, stream_buffer: bytes = None) ->
Perform checkpoint task.
Args:
ctx (FLSpec / bytes): Collaborator FLSpec object for which checkpoint is to be performed.
ctx (FLSpec / bytes): Collaborator FLSpec object for which checkpoint is to be
performed.
f (Callable / bytes): Collaborator Step (Function) which is to be checkpointed.
stream_buffer (bytes): Captured object for output and error (default=None).
reserved_attributes (List[str]): List of attribute names which is to be excluded
Expand Down Expand Up @@ -346,8 +348,8 @@ def do_task(self, f_name: str) -> Any:
if aggregator_to_collaborator(f, parent_func):
# TODO: Add a comment here
if len(self.flow.execute_task_args) > 4:
self.clones_dict, self.instance_snapshot, self.kwargs = \
self.flow.execute_task_args[3:]
temp = self.flow.execute_task_args[3:]
self.clones_dict, self.instance_snapshot, self.kwargs = temp

self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"])
else:
Expand Down
18 changes: 9 additions & 9 deletions openfl/experimental/component/collaborator/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ class Collaborator:
aggregator_uuid (str): The unique id for the client.
federation_uuid (str): The unique id for the federation.
client (AggregatorGRPCClient): GRPC Client to connect to
client (AggregatorGRPCClient): GRPC Client to connect to
Aggregator Server.
private_attrs_callable (Callable): Function for Collaborator
private_attrs_callable (Callable): Function for Collaborator
private attriubtes.
private_attrs_kwargs (Dict): Arguments to call private_attrs_callable.
Expand Down Expand Up @@ -54,7 +54,7 @@ def __init__(self,

def __initialize_private_attributes(self, kwrags: Dict) -> None:
"""
Call private_attrs_callable function set
Call private_attrs_callable function set
attributes to self.__private_attrs
Args:
Expand All @@ -72,7 +72,7 @@ def __set_attributes_to_clone(self, clone: Any) -> None:
Set private_attrs to clone as attributes.
Args:
clone (FLSpec): Clone to which private attributes are to be
clone (FLSpec): Clone to which private attributes are to be
set
Returns:
Expand All @@ -88,7 +88,7 @@ def __delete_agg_attrs_from_clone(self, clone: Any, replace_str: str = None) ->
transition from Aggregator step to collaborator steps
Args:
clone (FLSpec): Clone from which private attributes are to be
clone (FLSpec): Clone from which private attributes are to be
removed
Returns:
Expand Down Expand Up @@ -157,8 +157,8 @@ def send_task_results(self, next_step: str, clone: Any) -> None:
Returns:
None
"""
self.logger.info(
f'Round {self.round_number}, collaborator {self.name} is sending results.')
self.logger.info(f'Round {self.round_number}, '
f'collaborator {self.name} is sending results.')
self.client.send_task_results(
self.name, self.round_number,
next_step, pickle.dumps(clone)
Expand All @@ -178,8 +178,8 @@ def get_tasks(self) -> Tuple:
time_to_quit (bool): True if end of reached
"""
self.logger.info('Waiting for tasks...')
self.round_number, next_step, clone_bytes, sleep_time, \
time_to_quit = self.client.get_tasks(self.name)
temp = self.client.get_tasks(self.name)
self.round_number, next_step, clone_bytes, sleep_time, time_to_quit = temp

return next_step, pickle.loads(clone_bytes), sleep_time, time_to_quit

Expand Down
53 changes: 23 additions & 30 deletions openfl/experimental/federated/plan/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,6 @@ def parse(

plan.authorized_cols = Plan.load(cols_config_path).get("collaborators", [])

# TODO: Does this need to be a YAML file? Probably want to use key
# value as the plan hash
plan.cols_data_paths = {}
if data_config_path is not None:
with open(data_config_path, "r", encoding="utf-8") as f:
# TODO:need to replace with load
plan.cols_data_paths = safe_load(f)

if resolve:
plan.resolve()

Expand Down Expand Up @@ -260,7 +252,8 @@ def get_aggregator(self):
defaults[SETTINGS]["private_attributes_kwargs"] = private_attrs_kwargs

defaults[SETTINGS]["flow"] = self.get_flow()
defaults[SETTINGS]["checkpoint"] = self.config.get("federated_flow")["settings"]["checkpoint"]
defaults[SETTINGS]["checkpoint"] = self.config.get(
"federated_flow")["settings"]["checkpoint"]

log_metric_callback = defaults[SETTINGS].get("log_metric_callback")
if log_metric_callback:
Expand Down Expand Up @@ -418,38 +411,38 @@ def import_kwargs_modules(self, defaults):
defaults[SETTINGS][key] = Plan.build(
**value_defaults_data
)
except:
except Exception:
raise ImportError(f"Cannot import {value_defaults}.")
return defaults

def get_private_attr(self, private_attr_name=None):
private_attrs_callable = None
private_attrs_kwargs = {}

from os.path import isfile
import os
from openfl.experimental.federated.plan import Plan
from pathlib import Path

data_yaml = "plan/data.yaml"

if isfile(data_yaml):
if os.path.exists(data_yaml) and os.path.isfile(data_yaml):
d = Plan.load(Path(data_yaml).absolute())

if d.get(private_attr_name):
private_attrs_callable = {
"template": d.get(private_attr_name)["callable_func"]["template"]
}

private_attrs_kwargs = self.import_kwargs_modules(
d.get(private_attr_name)["callable_func"]
)["settings"]

if private_attrs_callable:
if isinstance(private_attrs_callable, dict):
private_attrs_callable = Plan.import_(**private_attrs_callable)
elif not callable(private_attrs_callable):
raise TypeError(
f"private_attrs_callable should be callable object "
f"or be import from code part, get {private_attrs_callable}"
)
return private_attrs_callable, private_attrs_kwargs
if d.get(private_attr_name, None):
private_attrs_callable = {
"template": d.get(private_attr_name)["callable_func"]["template"]
}

private_attrs_kwargs = self.import_kwargs_modules(
d.get(private_attr_name)["callable_func"]
)["settings"]

if isinstance(private_attrs_callable, dict):
private_attrs_callable = Plan.import_(**private_attrs_callable)
elif not callable(private_attrs_callable):
raise TypeError(
f"private_attrs_callable should be callable object "
f"or be import from code part, get {private_attrs_callable}"
)
return private_attrs_callable, private_attrs_kwargs
return None, None
19 changes: 10 additions & 9 deletions openfl/experimental/interface/cli/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""Aggregator module."""

import sys
import time
import threading
from logging import getLogger

Expand All @@ -27,6 +26,7 @@ def aggregator(context):
"""Manage Federated Learning Aggregator."""
context.obj['group'] = 'aggregator'


@aggregator.command(name='start')
@option('-p', '--plan', required=False,
help='Federated learning plan [plan/plan.yaml]',
Expand Down Expand Up @@ -54,20 +54,21 @@ def start_(plan, authorized_cols, secure):
plan = Plan.parse(plan_config_path=Path(plan).absolute(),
cols_config_path=Path(authorized_cols).absolute())

# if not os.path.exists('plan/data.yaml'):
# logger.warning('🧿 Starting the Aggregator Service without private .')
# else:
logger.info('🧿 Starting the Aggregator Service.')

if not os.path.exists('plan/data.yaml'):
logger.warning('Aggregator private attributes are set to None as not plan/data.yaml found in workspace.')
logger.warning(
'Aggregator private attributes are set to None as plan/data.yaml not found'
+ ' in workspace.')
else:
import yaml
from yaml.loader import SafeLoader
with open('plan/data.yaml') as f:
data = yaml.load(f, Loader=SafeLoader)
if data.get("aggregator", None) == None:
logger.warning('Aggregator private attributes are set to None as aggregator section in plan/data.yaml is not mentioned.')
with open('plan/data.yaml', 'r') as f:
data = yaml.load(f, Loader=SafeLoader)
if data.get("aggregator", None) is None:
logger.warning(
'Aggregator private attributes are set to None as no aggregator'
+ ' attributes found in plan/data.yaml.')

agg_server = plan.get_server()
agg_server.is_server_started = False
Expand Down
10 changes: 9 additions & 1 deletion openfl/experimental/interface/cli/cli_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def pretty(o):
m = max(map(len, o.keys()))

for k, v in o.items():
echo(style(f'{k:<{m}} : ', fg='blue') + style(f'{v}', fg='cyan'))
echo(style(f'{k:<{m}} : ', fg='blue') + style(f'{v}', fg='cyan'))


def tree(path):
"""Print current directory file tree."""
Expand All @@ -43,6 +44,7 @@ def tree(path):
else:
echo(f'{space}d {path.name}')


def print_tree(dir_path: Path, level: int = -1,
limit_to_directories: bool = False,
length_limit: int = 1000):
Expand Down Expand Up @@ -86,6 +88,7 @@ def inner(dir_path: Path, prefix: str = '', level=-1):
echo(f'... length_limit, {length_limit}, reached, counted:')
echo(f'\n{directories} directories' + (f', {files} files' if files else ''))


def copytree(src, dst, symlinks=False, ignore=None,
ignore_dangling_symlinks=False, dirs_exist_ok=False):
"""From Python 3.8 'shutil' which include 'dirs_exist_ok' option."""
Expand Down Expand Up @@ -155,6 +158,7 @@ def _copytree():

return _copytree()


def get_workspace_parameter(name):
"""Get a parameter from the workspace config file (.workspace)."""
# Update the .workspace file to show the current workspace plan
Expand All @@ -171,6 +175,7 @@ def get_workspace_parameter(name):
else:
return doc[name]


def check_varenv(env: str = '', args: dict = None):
"""Update "args" (dictionary) with <env: env_value> if env has a defined value in the host."""
if args is None:
Expand All @@ -181,6 +186,7 @@ def check_varenv(env: str = '', args: dict = None):

return args


def get_fx_path(curr_path=''):
"""Return the absolute path to fx binary."""
import re
Expand All @@ -194,6 +200,7 @@ def get_fx_path(curr_path=''):

return fx_path


def remove_line_from_file(pkg, filename):
"""Remove line that contains `pkg` from the `filename` file."""
with open(filename, 'r+', encoding='utf-8') as f:
Expand All @@ -204,6 +211,7 @@ def remove_line_from_file(pkg, filename):
f.write(i)
f.truncate()


def replace_line_in_file(line, line_num_to_replace, filename):
"""Replace line at `line_num_to_replace` with `line`."""
with open(filename, 'r+', encoding='utf-8') as f:
Expand Down
Loading

0 comments on commit de2e333

Please sign in to comment.