Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New grpc #34

Merged
merged 9 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"python.analysis.typeCheckingMode": "strict"
}
5 changes: 5 additions & 0 deletions src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ Client Metrics

Server Metrics
<img src="../resources/images/Server_metrics.png" width=50% height=50%>

### Debugging instructions
GRPC simulation starts a lot of threads and even if one of them fail right now then you will have to kill all of them and start all over.
So, here is a command to get the pid of all the threads and kill them all at once:
`for pid in $(ps aux|grep 'python main.py -r' | cut -b 10-16); do kill -9 $pid; done`
79 changes: 35 additions & 44 deletions src/algos/base_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
from torch.utils.data import DataLoader, Subset

from collections import OrderedDict
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
from torch import Tensor
import copy
import random
import numpy as np

from utils.communication.comm_utils import CommunicationManager
from utils.plot_utils import PlotUtils
from utils.comm_utils import CommUtils
from utils.data_utils import (
random_samples,
filter_by_class,
Expand All @@ -28,15 +28,15 @@
get_dset_balanced_communities,
get_dset_communities,
)
import torchvision.transforms as T
import torchvision.transforms as T # type: ignore
import os

from yolo import YOLOLoss

class BaseNode(ABC):
def __init__(self, config) -> None:
self.comm_utils = CommUtils()
self.node_id = self.comm_utils.rank
def __init__(self, config: Dict[str, Any], comm_utils: CommunicationManager) -> None:
self.comm_utils = comm_utils
self.node_id = self.comm_utils.get_rank()

if self.node_id == 0:
self.log_dir = config['log_path']
Expand All @@ -54,21 +54,21 @@ def __init__(self, config) -> None:
if isinstance(config["dset"], dict):
if self.node_id != 0:
config["dset"].pop("0")
self.dset = config["dset"][str(self.node_id)]
self.dset = str(config["dset"][str(self.node_id)])
config["dpath"] = config["dpath"][self.dset]
else:
self.dset = config["dset"]

self.setup_cuda(config)
self.model_utils = ModelUtils()
self.model_utils = ModelUtils(self.device)

self.dset_obj = get_dataset(self.dset, dpath=config["dpath"])
self.set_constants()

def set_constants(self):
self.best_acc = 0.0

def setup_cuda(self, config):
def setup_cuda(self, config: Dict[str, Any]):
# Need a mapping from rank to device id
device_ids_map = config["device_ids"]
node_name = "node_{}".format(self.node_id)
Expand All @@ -82,7 +82,7 @@ def setup_cuda(self, config):
self.device = torch.device("cpu")
print("Using CPU")

def set_model_parameters(self, config):
def set_model_parameters(self, config: Dict[str, Any]):
# Model related parameters
optim_name = config.get("optimizer", "adam")
if optim_name == "adam":
Expand Down Expand Up @@ -149,7 +149,7 @@ def set_shared_exp_parameters(self, config):
self.log_utils.log_console("Communities: {}".format(self.communities))

@abstractmethod
def run_protocol(self):
def run_protocol(self) -> None:
raise NotImplementedError


Expand All @@ -158,8 +158,8 @@ class BaseClient(BaseNode):
Abstract class for all algorithms
"""

def __init__(self, config) -> None:
super().__init__(config)
def __init__(self, config, comm_utils) -> None:
super().__init__(config, comm_utils)
self.server_node = 0
self.set_parameters(config)

Expand Down Expand Up @@ -215,8 +215,8 @@ def set_data_parameters(self, config):
train_dset = self.dset_obj.train_dset
test_dset = self.dset_obj.test_dset

print("num train", len(train_dset))
print("num test", len(test_dset))
# print("num train", len(train_dset))
# print("num test", len(test_dset))

if config.get("test_samples_per_class", None) is not None:
test_dset, _ = balanced_subset(test_dset, config["test_samples_per_class"])
Expand Down Expand Up @@ -369,19 +369,19 @@ def is_same_dest(dset):
# TODO: fix print_data_summary
# self.print_data_summary(train_dset, test_dset, val_dset=val_dset)

def local_train(self, dataset, **kwargs):
def local_train(self, round: int, **kwargs: Any) -> None:
"""
Train the model locally
"""
raise NotImplementedError

def local_test(self, dataset, **kwargs):
def local_test(self, **kwargs: Any) -> float | Tuple[float, float] | None:
"""
Test the model locally
"""
raise NotImplementedError

def get_representation(self, **kwargs):
def get_representation(self, **kwargs: Any) -> OrderedDict[str, Tensor] | List[Tensor] | Tensor:
"""
Share the model representation
"""
Expand Down Expand Up @@ -416,30 +416,26 @@ def print_data_summary(self, train_test, test_dset, val_dset=None):
print("test count: ", i)
i += 1

print("Node: {} data distribution summary".format(self.node_id))
print(type(train_sample_per_class.items()))
print(
"Train samples per class: {}".format(sorted(train_sample_per_class.items()))
)
print(
"Train samples per class: {}".format(len(train_sample_per_class.items()))
)
if val_dset is not None:
print(
"Val samples per class: {}".format(len(val_sample_per_class.items()))
)
print(
"Test samples per class: {}".format(len(test_sample_per_class.items()))
)
# print("Node: {} data distribution summary".format(self.node_id))
# print(
# "Train samples per class: {}".format(sorted(train_sample_per_class.items()))
# )
# if val_dset is not None:
# print(
# "Val samples per class: {}".format(sorted(val_sample_per_class.items()))
# )
# print(
# "Test samples per class: {}".format(sorted(test_sample_per_class.items()))
# )


class BaseServer(BaseNode):
"""
Abstract class for orchestrator
"""

def __init__(self, config) -> None:
super().__init__(config)
def __init__(self, config, comm_utils) -> None:
super().__init__(config, comm_utils)
self.num_users = config["num_users"]
self.users = list(range(1, self.num_users + 1))
self.set_data_parameters(config)
Expand All @@ -449,13 +445,13 @@ def set_data_parameters(self, config):
batch_size = config["batch_size"]
self._test_loader = DataLoader(test_dset, batch_size=batch_size)

def aggregate(self, representation_list, **kwargs):
def aggregate(self, representation_list: List[OrderedDict[str, Tensor]], **kwargs: Any) -> OrderedDict[str, Tensor]:
"""
Aggregate the knowledge from the users
"""
raise NotImplementedError

def test(self, dataset, **kwargs):
def test(self, **kwargs: Any) -> List[float]:
"""
Test the model on the server
"""
Expand Down Expand Up @@ -668,10 +664,5 @@ def __init__(self, config, comm_protocol=CommProtocol) -> None:
super().__init__(config)
self.tag = comm_protocol

def send_representations(self, representations, tag=None):
for user_node in self.users:
self.comm_utils.send_signal(
dest=user_node,
data=representations,
tag=self.tag.REPRS_SHARE if tag is None else tag,
)
def send_representations(self, representations: Dict[int, OrderedDict[str, Tensor]]):
self.comm_utils.broadcast(representations)
Loading
Loading