Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated GitHub Actions Test for gRPC Training #148

Merged
merged 22 commits into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions .github/workflows/train.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
name: Test Training Code with gRPC

on:
workflow_dispatch:
push:
branches:
# - main
- "*"
pull_request:
branches:
- main

env:
ACTIONS_STEP_DEBUG: true

jobs:
train-check:
runs-on: ubuntu-latest

steps:
# Step 1: Checkout the code
- name: Checkout repository
uses: actions/checkout@v3

# Step 2: Set up Python
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10" # Specify the Python version you're using

# Step 3: Install dependencies
- name: Install dependencies
run: |
sudo apt update
sudo apt install -y libopenmpi-dev openmpi-bin
sudo apt-get install -y libgl1 libglib2.0-0

pip install -r requirements.txt

# Step 4: Run gRPC server and client
- name: Run test
run: |
cd src
# chmod +x ./configs/algo_config_test.py

echo "starting main grpc"
python main_grpc.py -n 4 -host localhost
echo "starting main"
python main.py -super true -s "./configs/sys_config_test.py"
echo "done"

# further checks:
# only 5 rounds
# gRPC only? or also MPI?
# num of samples
# num users and nodes
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"python.analysis.typeCheckingMode": "strict"
}
"python.analysis.typeCheckingMode": "strict"
}
1 change: 0 additions & 1 deletion src/algos/fl.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ def fed_avg(self, model_wts: List[OrderedDict[str, Tensor]]):
num_users = len(model_wts)
coeff = 1 / num_users
avgd_wts: OrderedDict[str, Tensor] = OrderedDict()

for key in model_wts[0].keys():
avgd_wts[key] = sum(coeff * m[key] for m in model_wts) # type: ignore

Expand Down
3 changes: 2 additions & 1 deletion src/configs/algo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ def get_malicious_types(malicious_config_list: List[ConfigType]) -> Dict[str, st
"algo": "fedstatic",
"topology": {"name": "watts_strogatz", "k": 3, "p": 0.2}, # type: ignore
"rounds": 3,

# Model parameters
"optimizer": "sgd", # TODO comment out for real training
"model": "resnet10",
Expand Down Expand Up @@ -362,4 +361,6 @@ def get_malicious_types(malicious_config_list: List[ConfigType]) -> Dict[str, st
malicious_traditional_model_update_attack,
]


default_config_list: List[ConfigType] = [traditional_fl]
# default_config_list: List[ConfigType] = [fedstatic, fedstatic, fedstatic, fedstatic]
26 changes: 26 additions & 0 deletions src/configs/algo_config_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from utils.types import ConfigType

# fedstatic: ConfigType = {
# # Collaboration setup
# "algo": "fedstatic",
# "topology": {"name": "watts_strogatz", "k": 3, "p": 0.2}, # type: ignore
# "rounds": 1,

# # Model parameters
# "model": "resnet10",
# "model_lr": 3e-4,
# "batch_size": 256,
# }

traditional_fl: ConfigType = {
# Collaboration setup
"algo": "fedavg",
"rounds": 1,

# Model parameters
"model": "resnet10",
"model_lr": 3e-4,
"batch_size": 256,
}

# default_config_list: List[ConfigType] = [fedstatic, fedstatic, fedstatic, fedstatic]
15 changes: 9 additions & 6 deletions src/configs/sys_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,14 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE):
CIAR10_DPATH = "./datasets/imgs/cifar10/"

NUM_COLLABORATORS = 1
DUMP_DIR = "/mas/camera/Experiments/SONAR/abhi/"
# DUMP_DIR = "../../../../../../../home/"
DUMP_DIR = "/tmp/"

num_users = 3
mpi_system_config: ConfigType = {
"exp_id": "",
"comm": {"type": "MPI"},
"num_users": num_users,
"num_collaborators": NUM_COLLABORATORS,
"dset": CIFAR10_DSET,
"dump_dir": DUMP_DIR,
Expand All @@ -177,11 +180,9 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE):
# "algo": get_algo_configs(num_users=3, algo_configs=algo_configs_list),
"algos": get_algo_configs(
num_users=3,
algo_configs=malicious_algo_config_list,
assignment_method="distribution",
distribution={0: 1, 1: 1, 2: 1},
algo_configs=default_config_list
), # type: ignore
"samples_per_user": 1000, # TODO: To model scenarios where different users have different number of samples
"samples_per_user": 5555, # TODO: To model scenarios where different users have different number of samples
# we need to make this a dictionary with user_id as key and number of samples as value
"train_label_distribution": "iid", # Either "iid", "non_iid" "support"
"test_label_distribution": "iid", # Either "iid", "non_iid" "support"
Expand Down Expand Up @@ -348,7 +349,8 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE):
"device_ids": get_device_ids(num_users, gpu_ids),
# "algos": get_algo_configs(num_users=num_users, algo_configs=default_config_list), # type: ignore
"algos": get_algo_configs(num_users=num_users, algo_configs=[fedstatic]), # type: ignore
"samples_per_user": 50000 // num_users, # distributed equally
# "samples_per_user": 50000 // num_users, # distributed equally
"samples_per_user": 100,
"train_label_distribution": "non_iid",
"test_label_distribution": "iid",
"alpha_data": 1.0,
Expand Down Expand Up @@ -389,3 +391,4 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE):

current_config = grpc_system_config
# current_config = mpi_system_config

126 changes: 126 additions & 0 deletions src/configs/sys_config_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import Dict, List, Literal, Optional
import random
from utils.types import ConfigType

from .algo_config_test import (
traditional_fl
)

def get_device_ids(num_users: int, gpus_available: List[int | Literal["cpu"]]) -> Dict[str, List[int | Literal["cpu"]]]:
"""
Get the GPU device IDs for the users.
"""
# TODO: Make it multi-host
device_ids: Dict[str, List[int | Literal["cpu"]]] = {}
for i in range(num_users + 1): # +1 for the super-node
index = i % len(gpus_available)
gpu_id = gpus_available[index]
device_ids[f"node_{i}"] = [gpu_id]
return device_ids


def get_algo_configs(
num_users: int,
algo_configs: List[ConfigType],
assignment_method: Literal[
"sequential", "random", "mapping", "distribution"
] = "sequential",
seed: Optional[int] = 1,
mapping: Optional[List[int]] = None,
distribution: Optional[Dict[int, int]] = None,
) -> Dict[str, ConfigType]:
"""
Assign an algorithm configuration to each node, allowing for repetition.
sequential: Assigns the algo_configs sequentially to the nodes
random: Assigns the algo_configs randomly to the nodes
mapping: Assigns the algo_configs based on the mapping of node index to algo index provided
distribution: Assigns the algo_configs based on the distribution of algo index to number of nodes provided
"""
algo_config_map: Dict[str, ConfigType] = {}
algo_config_map["node_0"] = algo_configs[0] # Super-node
if assignment_method == "sequential":
for i in range(1, num_users + 1):
algo_config_map[f"node_{i}"] = algo_configs[i % len(algo_configs)]
elif assignment_method == "random":
for i in range(1, num_users + 1):
algo_config_map[f"node_{i}"] = random.choice(algo_configs)
elif assignment_method == "mapping":
if not mapping:
raise ValueError("Mapping must be provided for assignment method 'mapping'")
assert len(mapping) == num_users
for i in range(1, num_users + 1):
algo_config_map[f"node_{i}"] = algo_configs[mapping[i - 1]]
elif assignment_method == "distribution":
if not distribution:
raise ValueError(
"Distribution must be provided for assignment method 'distribution'"
)
total_users = sum(distribution.values())
assert total_users == num_users

# List of node indices to assign
node_indices = list(range(1, total_users + 1))
# Seed for reproducibility
random.seed(seed)
# Shuffle the node indices based on the seed
random.shuffle(node_indices)

# Assign nodes based on the shuffled indices
current_index = 0
for algo_index, num_nodes in distribution.items():
for i in range(num_nodes):
node_id = node_indices[current_index]
algo_config_map[f"node_{node_id}"] = algo_configs[algo_index]
current_index += 1
else:
raise ValueError(f"Invalid assignment method: {assignment_method}")
# print("algo config mapping is: ", algo_config_map)
return algo_config_map

CIFAR10_DSET = "cifar10"
CIAR10_DPATH = "./datasets/imgs/cifar10/"

# DUMP_DIR = "../../../../../../../home/"
DUMP_DIR = "/tmp/"

NUM_COLLABORATORS = 1
num_users = 4

dropout_dict = {
"distribution_dict": { # leave dict empty to disable dropout
"method": "uniform", # "uniform", "normal"
"parameters": {} # "mean": 0.5, "std": 0.1 in case of normal distribution
},
"dropout_rate": 0.0, # cutoff for dropout: [0,1]
"dropout_correlation": 0.0, # correlation between dropouts of successive rounds: [0,1]
}

dropout_dicts = {"node_0": {}}
for i in range(1, num_users + 1):
dropout_dicts[f"node_{i}"] = dropout_dict

gpu_ids = [2, 3, 5, 6]

grpc_system_config: ConfigType = {
"exp_id": "static",
"num_users": num_users,
"num_collaborators": NUM_COLLABORATORS,
"comm": {"type": "GRPC", "synchronous": True, "peer_ids": ["localhost:50048"]}, # The super-node
"dset": CIFAR10_DSET,
"dump_dir": DUMP_DIR,
"dpath": CIAR10_DPATH,
"seed": 2,
"device_ids": get_device_ids(num_users, gpu_ids),
# "algos": get_algo_configs(num_users=num_users, algo_configs=default_config_list), # type: ignore
"algos": get_algo_configs(num_users=num_users, algo_configs=[traditional_fl]), # type: ignore
# "samples_per_user": 50000 // num_users, # distributed equally
"samples_per_user": 100,
"train_label_distribution": "non_iid",
"test_label_distribution": "iid",
"alpha_data": 1.0,
"exp_keys": [],
"dropout_dicts": dropout_dicts,
"test_samples_per_user": 200,
}

current_config = grpc_system_config
1 change: 0 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,5 @@
# Start the scheduler
scheduler.install_config()
scheduler.initialize()

# Run the job
scheduler.run_job()
2 changes: 1 addition & 1 deletion src/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ def initialize(self, copy_souce_code: bool = True) -> None:
random.seed(seed)
numpy.random.seed(seed)
self.merge_configs()

if self.communication.get_rank() == 0:
if copy_souce_code:
copy_source_code(self.config)
Expand All @@ -130,6 +129,7 @@ def initialize(self, copy_souce_code: bool = True) -> None:
rank=self.communication.get_rank(),
comm_utils=self.communication,
)
self.communication.send_quorum()

def run_job(self) -> None:
self.node.run_protocol()
Expand Down
6 changes: 5 additions & 1 deletion src/utils/communication/comm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from utils.communication.grpc.main import GRPCCommunication
from typing import Any, Dict, List, Tuple, TYPE_CHECKING
# from utils.communication.mpi import MPICommUtils
# from mpi4py import MPI

if TYPE_CHECKING:
from algos.base_class import BaseNode
Expand All @@ -21,7 +22,7 @@ def create_communication(
):
comm_type = comm_type
if comm_type == CommunicationType.MPI:
raise NotImplementedError("MPI's new version not yet implemented. Please use GRPC. See https://github.com/aidecentralized/sonar/issues/96 for more details.")
return MPICommUtils(config)
elif comm_type == CommunicationType.GRPC:
return GRPCCommunication(config)
elif comm_type == CommunicationType.HTTP:
Expand Down Expand Up @@ -71,6 +72,9 @@ def receive(self, node_ids: List[int]) -> Any:
def broadcast(self, data: Any, tag: int = 0):
self.comm.broadcast(data)

def send_quorum(self):
self.comm.send_quorum()

def all_gather(self, tag: int = 0):
return self.comm.all_gather()

Expand Down
12 changes: 11 additions & 1 deletion src/utils/communication/grpc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,18 @@ def initialize(self):
peer_ids=self.peer_ids_to_proto(self.servicer.peer_ids)
)
stub.send_peer_ids(proto_msg) # type: ignore
# stub.send_quorum(comm_pb2.Quorum(quorum=True)) # type: ignore

def send_quorum(self):
""" Send the quorum status to all nodes after peer IDs are sent. """
if self.rank == 0:
for peer_id in self.servicer.peer_ids:
if not self.is_own_id(peer_id):
host = self.get_host_from_rank(peer_id)
with grpc.insecure_channel(host) as channel: # type: ignore
stub = comm_pb2_grpc.CommunicationServerStub(channel)
stub.send_quorum(comm_pb2.Quorum(quorum=True)) # type: ignore
print(f"Quorum status sent to all nodes.")

def get_host_from_rank(self, rank: int) -> str:
for peer_id in self.servicer.peer_ids:
Expand All @@ -370,7 +381,6 @@ def send_with_retries(self, dest_host: str, buffer: Any) -> Any:
raise Exception("Failed to send data. Receiver unreachable.")



def send(self, dest: str | int, data: OrderedDict[str, Any]):
"""
data should be a python dictionary
Expand Down
4 changes: 4 additions & 0 deletions src/utils/communication/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ def send(self, dest: str | int, data: Any):
def receive(self, node_ids: List[int]) -> Any:
pass

@abstractmethod
def send_quorum(self) -> Any:
pass

@abstractmethod
def broadcast(self, data: Any):
pass
Expand Down
Loading
Loading