Skip to content

Commit

Permalink
Automated GitHub Actions Test for gRPC Training (#148)
Browse files Browse the repository at this point in the history
* added MPI Communication class

* added send thread, merged 2 classes

* improved comments

* testing mpi, model weights not acquired

* mpi works, occassional deadlock issue

* merged send and listener threads

* first draft of test

* using python3.10

* made testing sys and algo configs

* testing workflow

* predict next move ish

* moved quorum send

* moved quorum send

* using traditional fl algo

* run test only during push to main

* new dump_dir

* remove send_status from proto

* changed dump_dir

* small changes
  • Loading branch information
kathrynle20 authored Dec 1, 2024
1 parent 5e960ab commit cd614bb
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 21 deletions.
56 changes: 56 additions & 0 deletions .github/workflows/train.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
name: Test Training Code with gRPC

on:
workflow_dispatch:
push:
branches:
# - main
- "*"
pull_request:
branches:
- main

env:
ACTIONS_STEP_DEBUG: true

jobs:
train-check:
runs-on: ubuntu-latest

steps:
# Step 1: Checkout the code
- name: Checkout repository
uses: actions/checkout@v3

# Step 2: Set up Python
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10" # Specify the Python version you're using

# Step 3: Install dependencies
- name: Install dependencies
run: |
sudo apt update
sudo apt install -y libopenmpi-dev openmpi-bin
sudo apt-get install -y libgl1 libglib2.0-0
pip install -r requirements.txt
# Step 4: Run gRPC server and client
- name: Run test
run: |
cd src
# chmod +x ./configs/algo_config_test.py
echo "starting main grpc"
python main_grpc.py -n 4 -host localhost
echo "starting main"
python main.py -super true -s "./configs/sys_config_test.py"
echo "done"
# further checks:
# only 5 rounds
# gRPC only? or also MPI?
# num of samples
# num users and nodes
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"python.analysis.typeCheckingMode": "strict"
}
"python.analysis.typeCheckingMode": "strict"
}
1 change: 0 additions & 1 deletion src/algos/fl.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ def fed_avg(self, model_wts: List[OrderedDict[str, Tensor]]):
num_users = len(model_wts)
coeff = 1 / num_users
avgd_wts: OrderedDict[str, Tensor] = OrderedDict()

for key in model_wts[0].keys():
avgd_wts[key] = sum(coeff * m[key] for m in model_wts) # type: ignore

Expand Down
3 changes: 2 additions & 1 deletion src/configs/algo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ def get_malicious_types(malicious_config_list: List[ConfigType]) -> Dict[str, st
"algo": "fedstatic",
"topology": {"name": "watts_strogatz", "k": 3, "p": 0.2}, # type: ignore
"rounds": 3,

# Model parameters
"optimizer": "sgd", # TODO comment out for real training
"model": "resnet10",
Expand Down Expand Up @@ -362,4 +361,6 @@ def get_malicious_types(malicious_config_list: List[ConfigType]) -> Dict[str, st
malicious_traditional_model_update_attack,
]


default_config_list: List[ConfigType] = [traditional_fl]
# default_config_list: List[ConfigType] = [fedstatic, fedstatic, fedstatic, fedstatic]
26 changes: 26 additions & 0 deletions src/configs/algo_config_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from utils.types import ConfigType

# fedstatic: ConfigType = {
# # Collaboration setup
# "algo": "fedstatic",
# "topology": {"name": "watts_strogatz", "k": 3, "p": 0.2}, # type: ignore
# "rounds": 1,

# # Model parameters
# "model": "resnet10",
# "model_lr": 3e-4,
# "batch_size": 256,
# }

traditional_fl: ConfigType = {
# Collaboration setup
"algo": "fedavg",
"rounds": 1,

# Model parameters
"model": "resnet10",
"model_lr": 3e-4,
"batch_size": 256,
}

# default_config_list: List[ConfigType] = [fedstatic, fedstatic, fedstatic, fedstatic]
15 changes: 9 additions & 6 deletions src/configs/sys_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,14 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE):
CIAR10_DPATH = "./datasets/imgs/cifar10/"

NUM_COLLABORATORS = 1
DUMP_DIR = "/mas/camera/Experiments/SONAR/abhi/"
# DUMP_DIR = "../../../../../../../home/"
DUMP_DIR = "/tmp/"

num_users = 3
mpi_system_config: ConfigType = {
"exp_id": "",
"comm": {"type": "MPI"},
"num_users": num_users,
"num_collaborators": NUM_COLLABORATORS,
"dset": CIFAR10_DSET,
"dump_dir": DUMP_DIR,
Expand All @@ -177,11 +180,9 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE):
# "algo": get_algo_configs(num_users=3, algo_configs=algo_configs_list),
"algos": get_algo_configs(
num_users=3,
algo_configs=malicious_algo_config_list,
assignment_method="distribution",
distribution={0: 1, 1: 1, 2: 1},
algo_configs=default_config_list
), # type: ignore
"samples_per_user": 1000, # TODO: To model scenarios where different users have different number of samples
"samples_per_user": 5555, # TODO: To model scenarios where different users have different number of samples
# we need to make this a dictionary with user_id as key and number of samples as value
"train_label_distribution": "iid", # Either "iid", "non_iid" "support"
"test_label_distribution": "iid", # Either "iid", "non_iid" "support"
Expand Down Expand Up @@ -348,7 +349,8 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE):
"device_ids": get_device_ids(num_users, gpu_ids),
# "algos": get_algo_configs(num_users=num_users, algo_configs=default_config_list), # type: ignore
"algos": get_algo_configs(num_users=num_users, algo_configs=[fedstatic]), # type: ignore
"samples_per_user": 50000 // num_users, # distributed equally
# "samples_per_user": 50000 // num_users, # distributed equally
"samples_per_user": 100,
"train_label_distribution": "non_iid",
"test_label_distribution": "iid",
"alpha_data": 1.0,
Expand Down Expand Up @@ -389,3 +391,4 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE):

current_config = grpc_system_config
# current_config = mpi_system_config

126 changes: 126 additions & 0 deletions src/configs/sys_config_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import Dict, List, Literal, Optional
import random
from utils.types import ConfigType

from .algo_config_test import (
traditional_fl
)

def get_device_ids(num_users: int, gpus_available: List[int | Literal["cpu"]]) -> Dict[str, List[int | Literal["cpu"]]]:
"""
Get the GPU device IDs for the users.
"""
# TODO: Make it multi-host
device_ids: Dict[str, List[int | Literal["cpu"]]] = {}
for i in range(num_users + 1): # +1 for the super-node
index = i % len(gpus_available)
gpu_id = gpus_available[index]
device_ids[f"node_{i}"] = [gpu_id]
return device_ids


def get_algo_configs(
num_users: int,
algo_configs: List[ConfigType],
assignment_method: Literal[
"sequential", "random", "mapping", "distribution"
] = "sequential",
seed: Optional[int] = 1,
mapping: Optional[List[int]] = None,
distribution: Optional[Dict[int, int]] = None,
) -> Dict[str, ConfigType]:
"""
Assign an algorithm configuration to each node, allowing for repetition.
sequential: Assigns the algo_configs sequentially to the nodes
random: Assigns the algo_configs randomly to the nodes
mapping: Assigns the algo_configs based on the mapping of node index to algo index provided
distribution: Assigns the algo_configs based on the distribution of algo index to number of nodes provided
"""
algo_config_map: Dict[str, ConfigType] = {}
algo_config_map["node_0"] = algo_configs[0] # Super-node
if assignment_method == "sequential":
for i in range(1, num_users + 1):
algo_config_map[f"node_{i}"] = algo_configs[i % len(algo_configs)]
elif assignment_method == "random":
for i in range(1, num_users + 1):
algo_config_map[f"node_{i}"] = random.choice(algo_configs)
elif assignment_method == "mapping":
if not mapping:
raise ValueError("Mapping must be provided for assignment method 'mapping'")
assert len(mapping) == num_users
for i in range(1, num_users + 1):
algo_config_map[f"node_{i}"] = algo_configs[mapping[i - 1]]
elif assignment_method == "distribution":
if not distribution:
raise ValueError(
"Distribution must be provided for assignment method 'distribution'"
)
total_users = sum(distribution.values())
assert total_users == num_users

# List of node indices to assign
node_indices = list(range(1, total_users + 1))
# Seed for reproducibility
random.seed(seed)
# Shuffle the node indices based on the seed
random.shuffle(node_indices)

# Assign nodes based on the shuffled indices
current_index = 0
for algo_index, num_nodes in distribution.items():
for i in range(num_nodes):
node_id = node_indices[current_index]
algo_config_map[f"node_{node_id}"] = algo_configs[algo_index]
current_index += 1
else:
raise ValueError(f"Invalid assignment method: {assignment_method}")
# print("algo config mapping is: ", algo_config_map)
return algo_config_map

CIFAR10_DSET = "cifar10"
CIAR10_DPATH = "./datasets/imgs/cifar10/"

# DUMP_DIR = "../../../../../../../home/"
DUMP_DIR = "/tmp/"

NUM_COLLABORATORS = 1
num_users = 4

dropout_dict = {
"distribution_dict": { # leave dict empty to disable dropout
"method": "uniform", # "uniform", "normal"
"parameters": {} # "mean": 0.5, "std": 0.1 in case of normal distribution
},
"dropout_rate": 0.0, # cutoff for dropout: [0,1]
"dropout_correlation": 0.0, # correlation between dropouts of successive rounds: [0,1]
}

dropout_dicts = {"node_0": {}}
for i in range(1, num_users + 1):
dropout_dicts[f"node_{i}"] = dropout_dict

gpu_ids = [2, 3, 5, 6]

grpc_system_config: ConfigType = {
"exp_id": "static",
"num_users": num_users,
"num_collaborators": NUM_COLLABORATORS,
"comm": {"type": "GRPC", "synchronous": True, "peer_ids": ["localhost:50048"]}, # The super-node
"dset": CIFAR10_DSET,
"dump_dir": DUMP_DIR,
"dpath": CIAR10_DPATH,
"seed": 2,
"device_ids": get_device_ids(num_users, gpu_ids),
# "algos": get_algo_configs(num_users=num_users, algo_configs=default_config_list), # type: ignore
"algos": get_algo_configs(num_users=num_users, algo_configs=[traditional_fl]), # type: ignore
# "samples_per_user": 50000 // num_users, # distributed equally
"samples_per_user": 100,
"train_label_distribution": "non_iid",
"test_label_distribution": "iid",
"alpha_data": 1.0,
"exp_keys": [],
"dropout_dicts": dropout_dicts,
"test_samples_per_user": 200,
}

current_config = grpc_system_config
1 change: 0 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,5 @@
# Start the scheduler
scheduler.install_config()
scheduler.initialize()

# Run the job
scheduler.run_job()
2 changes: 1 addition & 1 deletion src/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ def initialize(self, copy_souce_code: bool = True) -> None:
random.seed(seed)
numpy.random.seed(seed)
self.merge_configs()

if self.communication.get_rank() == 0:
if copy_souce_code:
copy_source_code(self.config)
Expand All @@ -130,6 +129,7 @@ def initialize(self, copy_souce_code: bool = True) -> None:
rank=self.communication.get_rank(),
comm_utils=self.communication,
)
self.communication.send_quorum()

def run_job(self) -> None:
self.node.run_protocol()
Expand Down
6 changes: 5 additions & 1 deletion src/utils/communication/comm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from utils.communication.grpc.main import GRPCCommunication
from typing import Any, Dict, List, Tuple, TYPE_CHECKING
# from utils.communication.mpi import MPICommUtils
# from mpi4py import MPI

if TYPE_CHECKING:
from algos.base_class import BaseNode
Expand All @@ -21,7 +22,7 @@ def create_communication(
):
comm_type = comm_type
if comm_type == CommunicationType.MPI:
raise NotImplementedError("MPI's new version not yet implemented. Please use GRPC. See https://github.com/aidecentralized/sonar/issues/96 for more details.")
return MPICommUtils(config)
elif comm_type == CommunicationType.GRPC:
return GRPCCommunication(config)
elif comm_type == CommunicationType.HTTP:
Expand Down Expand Up @@ -71,6 +72,9 @@ def receive(self, node_ids: List[int]) -> Any:
def broadcast(self, data: Any, tag: int = 0):
self.comm.broadcast(data)

def send_quorum(self):
self.comm.send_quorum()

def all_gather(self, tag: int = 0):
return self.comm.all_gather()

Expand Down
12 changes: 11 additions & 1 deletion src/utils/communication/grpc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,18 @@ def initialize(self):
peer_ids=self.peer_ids_to_proto(self.servicer.peer_ids)
)
stub.send_peer_ids(proto_msg) # type: ignore
# stub.send_quorum(comm_pb2.Quorum(quorum=True)) # type: ignore

def send_quorum(self):
""" Send the quorum status to all nodes after peer IDs are sent. """
if self.rank == 0:
for peer_id in self.servicer.peer_ids:
if not self.is_own_id(peer_id):
host = self.get_host_from_rank(peer_id)
with grpc.insecure_channel(host) as channel: # type: ignore
stub = comm_pb2_grpc.CommunicationServerStub(channel)
stub.send_quorum(comm_pb2.Quorum(quorum=True)) # type: ignore
print(f"Quorum status sent to all nodes.")

def get_host_from_rank(self, rank: int) -> str:
for peer_id in self.servicer.peer_ids:
Expand All @@ -370,7 +381,6 @@ def send_with_retries(self, dest_host: str, buffer: Any) -> Any:
raise Exception("Failed to send data. Receiver unreachable.")



def send(self, dest: str | int, data: OrderedDict[str, Any]):
"""
data should be a python dictionary
Expand Down
4 changes: 4 additions & 0 deletions src/utils/communication/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ def send(self, dest: str | int, data: Any):
def receive(self, node_ids: List[int]) -> Any:
pass

@abstractmethod
def send_quorum(self) -> Any:
pass

@abstractmethod
def broadcast(self, data: Any):
pass
Expand Down
Loading

0 comments on commit cd614bb

Please sign in to comment.