diff --git a/.gitignore b/.gitignore index 59dcfe7..8271ce7 100644 --- a/.gitignore +++ b/.gitignore @@ -129,8 +129,7 @@ data/ **/checkpoint/ experimental/ -# VSCode and JetBrains -.idea/ +# VSCode .vscode/ .DS_Store @@ -141,13 +140,3 @@ examples/.profile/* # timeline log file .log/* .profile/ - -# output -img/* -wandb/* -output/* -output* - -# script -run**.sh -scripts/ \ No newline at end of file diff --git a/bluefoglite/common/__init__.py b/bluefoglite/common/__init__.py index c5360ae..e69de29 100644 --- a/bluefoglite/common/__init__.py +++ b/bluefoglite/common/__init__.py @@ -1 +0,0 @@ -from bluefoglite.common import topology diff --git a/bluefoglite/common/handle_manager.py b/bluefoglite/common/handle_manager.py index a1fa2d4..7708e57 100644 --- a/bluefoglite/common/handle_manager.py +++ b/bluefoglite/common/handle_manager.py @@ -41,7 +41,6 @@ class EventStatus: DONE_EVENT = EventStatus(status=EventStatusEnum.DONE, err="") - # This should be a singleton class class HandleManager: __instance: Optional["HandleManager"] = None diff --git a/bluefoglite/common/optimizers.py b/bluefoglite/common/optimizers.py deleted file mode 100644 index ed35aa8..0000000 --- a/bluefoglite/common/optimizers.py +++ /dev/null @@ -1,234 +0,0 @@ -from enum import Enum -import itertools -import warnings - -import torch -import bluefoglite.torch_api as bfl - - -class CommunicationType(Enum): - neighbor_allreduce = "neighbor.allreduce" - allreduce = "allreduce" - empty = "empty" - - -_warning_message_num_step_per_communication = ( - "Unexpected behavior:\n" - " After num_steps_per_communication times of forward computation `y=model(x)` are called,\n" - " an optimizer step() function must be called.\n" - " It does not matter how many step() functions are called in between.\n" - " Please adjust num_step_per_communication to update model parameters locally.\n" - " More information can be found in the FAQ page.\n" -) -_warning_message_backward_pass_per_step = ( - "Unexpected behavior:\n" - " After num_steps_per_communication times of backward computation `loss.backward()` are called,\n" - " an optimizer step() function must be called.\n" - " It does not matter how many step() functions are called in between.\n" - " Please adjust num_steps_per_communication to accumulate gradients locally.\n" - " More information can be found in the FAQ page.\n" -) - - -def _named_leaf_module(module, parent_name=None): - """Yield an iterator over all leaf modules.""" - if not list(module.named_children()): - yield (parent_name, module) - for name, ch_module in module.named_children(): - full_name = parent_name + "." + name if parent_name else name - yield from _named_leaf_module(ch_module, full_name) - - -def _find_duplicates(lst): - seen = set() - dups = set() - for el in lst: - if el in seen: - dups.add(el) - seen.add(el) - return dups - - -def _check_named_parameters(optimizer, model): - _models = None - if isinstance(model, torch.nn.Module): - _models = [model] - if isinstance(model, list): - for m in model: - assert isinstance(m, torch.nn.Module) - _models = model - assert _models is not None - named_parameters = list(itertools.chain(*[m.named_parameters() for m in _models])) - - # make sure that named_parameters are tuples - if any([not isinstance(p, tuple) for p in named_parameters]): - raise ValueError( - "named_parameters should be a sequence of " - "tuples (name, parameter), usually produced by " - "model.named_parameters()." - ) - - dups = _find_duplicates([k for k, _ in named_parameters]) - if dups: - raise ValueError( - "Parameter names in named_parameters must be unique. " - "Found duplicates: %s" % ", ".join(dups) - ) - - all_param_ids = { - id(v) for param_group in optimizer.param_groups for v in param_group["params"] - } - named_param_ids = {id(v) for k, v in named_parameters} - unnamed_param_ids = all_param_ids - named_param_ids - if unnamed_param_ids: - raise ValueError( - "Named parameters provided by model are mismatch with the parameters" - "handled by optimizer. Python object ids: " - "%s" % ", ".join(str(id) for id in unnamed_param_ids) - ) - return named_parameters, _models - - -class _DistributedReduceOptimizer(torch.optim.Optimizer): - def __init__( - self, params, model, communication_type, num_steps_per_communication=1 - ): - super(self.__class__, self).__init__(params) - - named_parameters, models = _check_named_parameters(self, model) - # knobs for neighbor communication behavior - self.self_weight = None - self.src_weights = None - self.dst_weights = None - self.src_machine_weights = None - self.dst_machine_weights = None - self.enable_topo_check = False - - self._models = models - self._parameter_names = {v: k for k, v in sorted(named_parameters)} - self._name_parameters = {k: v for k, v in sorted(named_parameters)} - self._async_works = {} - self._requires_update = set() - self._synchronized = False - self._should_synchronize = True - self._error_encountered = False - self._num_steps_per_communication = num_steps_per_communication - assert isinstance(communication_type, CommunicationType) - self._communication_type = communication_type - - self._reduce_delay = { - v: self._num_steps_per_communication for _, v in sorted(named_parameters) - } - if bfl.size() > 1: - self._register_hooks() - - def _register_hooks(self): - for model in self._models: - # The hook is added at model level instead of layer level, as it avoids triggering - # the hook function of the same layer multiple times in case the layer is called - # several times during the forward computation of the model. - model.register_forward_hook(self._make_hook()) - self._requires_update.update(dict(model.named_parameters()).values()) - - def _make_hook(self): - def hook(model, *unused): - for parent_name, layer in _named_leaf_module(model): - for name, p in layer.named_parameters(): - if not layer.training: - continue - if ( - self._name_parameters.get(parent_name + "." + name, None) - is None - ): - # Some case like encoder-decode, which shared the same weights. - continue - if p.requires_grad: - if self._reduce_delay[p] <= 0: - if not self._error_encountered: - warnings.warn( - _warning_message_num_step_per_communication - ) - self._error_encountered = True - self._reduce_delay[p] -= 1 - if self._reduce_delay[p] == 0: - if self._communication_type == CommunicationType.allreduce: - async_work = self._allreduce_data_async(p) - elif ( - self._communication_type - == CommunicationType.neighbor_allreduce - ): - async_work = self._neighbor_allreduce_data_async(p) - elif self._communication_type == CommunicationType.empty: - async_work = None - else: - raise ValueError( - "Unsuppported CommunicationType encountered." - ) - self._async_works[p] = async_work - - return hook - - def _neighbor_allreduce_data_async(self, p): - async_work = bfl.neighbor_allreduce_nonblocking( - p.data, - self_weight=self.self_weight, - src_weights=self.src_weights, - dst_weights=self.dst_weights, - ) - return async_work - - def _allreduce_data_async(self, p): - async_work = bfl.allreduce_nonblocking(p.data) - return async_work - - @property - def communication_type(self): - return self._communication_type - - @communication_type.setter - def communication_type(self, value): - assert isinstance(value, CommunicationType) - self._communication_type = value - - def synchronize(self): - with torch.no_grad(): - for p, async_work in self._async_works.items(): - if async_work is not None: - output = async_work.wait() - p.set_(output) - # self._async_works[p] = self._num_steps_per_communication - self._reduce_delay[p] = self._num_steps_per_communication - self._async_works.clear() - self._synchronized = True - - def step(self, closure=None): - # consensus style is the easiest way to implement it. - if self._should_synchronize: - if self._synchronized: - warnings.warn( - "optimizer.step() called without " - "optimizer.skip_synchronize() context after " - "optimizer.synchronize(). This can cause training " - "slowdown. You may want to consider using " - "optimizer.skip_synchronize() context if you use " - "optimizer.synchronize() in your code." - ) - self.synchronize() - self._synchronized = False - return super(self.__class__, self).step(closure) - - -def DistributedAdaptWithCombineOptimizer( - optimizer, - model, - communication_type=CommunicationType.neighbor_allreduce, - num_steps_per_communication=1, -): - cls = type( - optimizer.__class__.__name__, - (optimizer.__class__,), - dict(_DistributedReduceOptimizer.__dict__), - ) - return cls( - optimizer.param_groups, model, communication_type, num_steps_per_communication - ) diff --git a/bluefoglite/common/tcp/agent.py b/bluefoglite/common/tcp/agent.py index 5c812f4..e0aaa36 100644 --- a/bluefoglite/common/tcp/agent.py +++ b/bluefoglite/common/tcp/agent.py @@ -21,7 +21,6 @@ from bluefoglite.common.tcp.pair import Pair, SocketFullAddress, TAddress from bluefoglite.common.logger import Logger - # One agent can contain multiple Contexts. # Each Context should represent entire communication group like (comm in MPI) # In each Context, it contains multiple Pairs, i.e. socket pair, talking to other neighbor. diff --git a/bluefoglite/common/tcp/message_pb2.py b/bluefoglite/common/tcp/message_pb2.py index c197b87..3d69fbe 100644 --- a/bluefoglite/common/tcp/message_pb2.py +++ b/bluefoglite/common/tcp/message_pb2.py @@ -2,9 +2,11 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: message.proto """Generated protocol buffer code.""" -from google.protobuf.internal import builder as _builder +from google.protobuf.internal import enum_type_wrapper from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database # @@protoc_insertion_point(imports) @@ -15,8 +17,37 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rmessage.proto\x12\x0b\x62luefoglite\"\xb8\x01\n\x06Header\x12\x16\n\x0e\x63ontent_length\x18\x01 \x01(\x06\x12.\n\x0cmessage_type\x18\x02 \x01(\x0e\x32\x18.bluefoglite.MessageType\x12\x0c\n\x04ndim\x18\x03 \x01(\x07\x12!\n\x05\x64type\x18\x04 \x01(\x0e\x32\x12.bluefoglite.DType\x12\x10\n\x08itemsize\x18\x05 \x01(\x07\x12\x14\n\x0cnum_elements\x18\x06 \x01(\x06\x12\r\n\x05shape\x18\x07 \x03(\x05*j\n\x0bMessageType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0f\n\x0bSEND_BUFFER\x10\x01\x12\x0f\n\x0bRECV_BUFFER\x10\x02\x12\x15\n\x11NOTIFY_SEND_READY\x10\x03\x12\x15\n\x11NOTIFY_RECV_READY\x10\x04*\xc2\x01\n\x05\x44Type\x12\r\n\tBFL_UINT8\x10\x00\x12\x0c\n\x08\x42\x46L_INT8\x10\x01\x12\x0e\n\nBFL_UINT16\x10\x02\x12\r\n\tBFL_INT16\x10\x03\x12\r\n\tBFL_INT32\x10\x04\x12\r\n\tBFL_INT64\x10\x05\x12\x0f\n\x0b\x42\x46L_FLOAT16\x10\x06\x12\x0f\n\x0b\x42\x46L_FLOAT32\x10\x07\x12\x0f\n\x0b\x42\x46L_FLOAT64\x10\x08\x12\x10\n\x0c\x42\x46L_FLOAT128\x10\t\x12\x0c\n\x08\x42\x46L_BOOL\x10\n\x12\x0c\n\x08\x42\x46L_BYTE\x10\x0b') -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'message_pb2', globals()) +_MESSAGETYPE = DESCRIPTOR.enum_types_by_name['MessageType'] +MessageType = enum_type_wrapper.EnumTypeWrapper(_MESSAGETYPE) +_DTYPE = DESCRIPTOR.enum_types_by_name['DType'] +DType = enum_type_wrapper.EnumTypeWrapper(_DTYPE) +UNKNOWN = 0 +SEND_BUFFER = 1 +RECV_BUFFER = 2 +NOTIFY_SEND_READY = 3 +NOTIFY_RECV_READY = 4 +BFL_UINT8 = 0 +BFL_INT8 = 1 +BFL_UINT16 = 2 +BFL_INT16 = 3 +BFL_INT32 = 4 +BFL_INT64 = 5 +BFL_FLOAT16 = 6 +BFL_FLOAT32 = 7 +BFL_FLOAT64 = 8 +BFL_FLOAT128 = 9 +BFL_BOOL = 10 +BFL_BYTE = 11 + + +_HEADER = DESCRIPTOR.message_types_by_name['Header'] +Header = _reflection.GeneratedProtocolMessageType('Header', (_message.Message,), { + 'DESCRIPTOR' : _HEADER, + '__module__' : 'message_pb2' + # @@protoc_insertion_point(class_scope:bluefoglite.Header) + }) +_sym_db.RegisterMessage(Header) + if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None diff --git a/bluefoglite/common/tcp/pair.py b/bluefoglite/common/tcp/pair.py index ed22e66..82746fc 100644 --- a/bluefoglite/common/tcp/pair.py +++ b/bluefoglite/common/tcp/pair.py @@ -47,7 +47,6 @@ # AF_NETLINK, AF_TIPC) or strings (AF_UNIX). TAddress = Union[Tuple[Any, ...], str] - # Socket related constant nomenclature follows: # AddressFamily startswith 'AF_' # SocketKind startswith 'SOCK_' diff --git a/bluefoglite/common/topology.py b/bluefoglite/common/topology.py index 9bd2b77..4297a1f 100644 --- a/bluefoglite/common/topology.py +++ b/bluefoglite/common/topology.py @@ -14,7 +14,7 @@ # ============================================================================== import math -from typing import Dict, Optional, Tuple, Iterator, List +from typing import Dict, Optional, Tuple import numpy as np import networkx as nx @@ -185,242 +185,3 @@ def RingGraph(size: int, connect_style: int = 0) -> nx.DiGraph: topo[i] = np.roll(x, i) G = nx.from_numpy_array(topo, create_using=nx.DiGraph) return G - - -def GetDynamicOnePeerSendRecvRanks( - topo: nx.DiGraph, self_rank: int -) -> Iterator[Tuple[List[int], List[int]]]: - """A utility function to generate 1-outoging send rank and corresponding recieving rank(s). - - Args: - topo (nx.DiGraph): The base topology to generate dynamic send and receive ranks. - self_rank (int): The self rank. - - Yields: - Iterator[Tuple[List[int], List[int]]]: send_ranks, recv_ranks. - """ - # Generate all outgoing ranks sorted by clock-wise. (Imagine all ranks put on a clock.) - size = topo.number_of_nodes() - sorted_send_ranks = [] - for rank in range(size): - sorted_ranks = sorted( - topo.successors(rank), - key=lambda r, rk=rank: r - rk if r >= rk else r - rk + size, - ) - if sorted_ranks[0] == rank: - sorted_ranks = sorted_ranks[1:] # remove the self-loop - sorted_send_ranks.append(sorted_ranks) - - self_degree = topo.out_degree(self_rank) - 1 - index = 0 - while True: - send_rank = sorted_send_ranks[self_rank][index % self_degree] - recv_ranks = [] - for other_rank in range(size): - if other_rank == self_rank: - continue - degree = topo.out_degree(other_rank) - 1 - if sorted_send_ranks[other_rank][index % degree] == self_rank: - recv_ranks.append(other_rank) - - yield [send_rank], recv_ranks - index += 1 - - -def GetExp2DynamicSendRecvMachineRanks( - world_size: int, local_size: int, self_rank: int, local_rank: int -) -> Iterator[Tuple[List[int], List[int]]]: - """ - A utility function to generate 1-outgoing send machine id and corresponding recieving - machine id(s) for Exponentia-2 topology. - - Args: - world_size (int): the size of all nodes; world_size = num_machines * nodes_per_machine - local_size (int): number of nodes in each machine - self_rank (int): The self rank. - local_rank (int): The self local rank. - - Yields: - Iterator[Tuple[List[int], List[int]]]: send_machine_ids, recv_machine_ids. - - Warning: - This function should be used under homogeneous enviroment only, i.e. all machines have - the same number of local processes. - """ - assert ( - self_rank % local_size - ) == local_rank, "It should be used under homogeneous environment only." - assert ( - world_size % local_size - ) == 0, "It should be used under homogeneous environment only." - assert ( - world_size > local_size - ), "It should be used under at least two machines case." - - machine_id = self_rank // local_size - machine_size = world_size // local_size - exp_2_size = int(np.log2(machine_size - 1)) if machine_size > 1 else 0 - index = 0 - while True: - machine_dist = 2 ** (index % (exp_2_size + 1)) - send_machine_rank = (machine_id + machine_dist) % machine_size - recv_machine_ranks = (machine_id - machine_dist) % machine_size - yield [send_machine_rank], [recv_machine_ranks] - index += 1 - - -def GetInnerOuterRingDynamicSendRecvRanks( - world_size: int, local_size: int, self_rank: int -) -> Iterator[Tuple[List[int], List[int]]]: - """ - A utility function to generate 1-outgoing send rank and corresponding recieving rank(s) - for Inner-Ring-Outer-Ring topology. - - Args: - world_size (int): the size of all nodes; world_size = num_machines * nodes_per_machine - local_size (int): number of nodes in each machine - self_rank (int): The self rank. - - Yields: - Iterator[Tuple[List[int], List[int]]]: send_ranks, recv_ranks. - """ - num_machines = world_size // local_size - nodes_per_machine = local_size - assert ( - world_size % local_size == 0 - ), "It should be used under homogeneous environment only." - assert local_size > 2, ( - "Do no support the case where nodes_per_machine is equal or " - "less than 2. Consider use hierarchical_neighbor_allreduce or GetDynamicOnePeerSendRecvRanks." - ) - - index = 0 - while True: - machine_id = self_rank // nodes_per_machine - local_rank_id = self_rank % nodes_per_machine - local_rank_to_go_outside_id = index % nodes_per_machine - - if local_rank_to_go_outside_id == local_rank_id: - # find send_rank - target_machine_id = (machine_id + 1) % num_machines - target_rank_id = target_machine_id * nodes_per_machine + local_rank_id - send_rank = target_rank_id - - # find recv_rank - source_machine_id = (machine_id - 1) % num_machines - source_rank_id = source_machine_id * nodes_per_machine + local_rank_id - recv_rank = source_rank_id - - else: - # find send_rank - target_local_rank_id = (local_rank_id + 1) % nodes_per_machine - if target_local_rank_id == local_rank_to_go_outside_id: - target_local_rank_id = (target_local_rank_id + 1) % nodes_per_machine - target_rank_id = target_local_rank_id + machine_id * nodes_per_machine - send_rank = target_rank_id - - # find recv_rank - source_local_rank_id = (local_rank_id - 1) % nodes_per_machine - if source_local_rank_id == local_rank_to_go_outside_id: - source_local_rank_id = (source_local_rank_id - 1) % nodes_per_machine - source_rank_id = source_local_rank_id + machine_id * nodes_per_machine - recv_rank = source_rank_id - - yield [send_rank], [recv_rank] - index += 1 - - -def GetInnerOuterExpo2DynamicSendRecvRanks( - world_size: int, local_size: int, self_rank: int -) -> Iterator[Tuple[List[int], List[int]]]: - """ - A utility function to generate 1-outgoing send rank and corresponding recieving rank(s) - for Inner-Exp2-Outer-Exp2 ring topology. - - Args: - world_size (int): the size of all nodes; world_size = num_machines * nodes_per_machine - local_size (int): number of nodes in each machine - self_rank (int): The self rank. - - Yields: - Iterator[Tuple[List[int], List[int]]]: send_ranks, recv_ranks. - """ - num_machines = world_size // local_size - nodes_per_machine = local_size - assert ( - world_size % local_size == 0 - ), "It should be used under homogeneous environment only." - assert local_size > 2, ( - "Do no support the case where nodes_per_machine is equal or " - "less than 2. Consider use hierarchical_neighbor_allreduce or GetDynamicOnePeerSendRecvRanks." - ) - - exp_2_out_size = int(np.log2(num_machines - 1)) - if nodes_per_machine == 2: - exp_2_in_size = 0 - else: - # -2 because we need to remove outgoing node - exp_2_in_size = int(np.log2(nodes_per_machine - 2)) - - index = 0 - while True: - machine_id = self_rank // nodes_per_machine - local_rank_id = self_rank % nodes_per_machine - local_rank_to_go_outside_id = index % nodes_per_machine - - if local_rank_to_go_outside_id == local_rank_id: - # Note: currently design is still not very good. Because some local rank i may NEVER - # directly talk to other machine's local rank i. Example: - # - # Assume num_machines=16, nodes_per_machine=4, and self_rank=1, then we know that - # exp_2_out_size=3, and local_rank_id=1. If this branch is reached, - # local_rank_to_go_outside_id=1, and index % (exp_2_out_size+1)=1, resulting in - # next_machine_dist always equal to 2. - next_machine_dist = 2 ** (index % (exp_2_out_size + 1)) - # find send_rank - target_machine_id = (machine_id + next_machine_dist) % num_machines - target_rank_id = target_machine_id * nodes_per_machine + local_rank_id - send_rank = target_rank_id - - # find recv_rank - source_machine_id = (machine_id - next_machine_dist) % num_machines - source_rank_id = source_machine_id * nodes_per_machine + local_rank_id - recv_rank = source_rank_id - - else: - # Distance from self to out-rank: - dist_to_out = ( - local_rank_to_go_outside_id - local_rank_id - ) % nodes_per_machine - next_inner_dist = 2 ** (index % (exp_2_in_size + 1)) - if next_inner_dist >= dist_to_out: - next_inner_dist += 1 - - # find send_rank - target_local_rank_id = (local_rank_id + next_inner_dist) % nodes_per_machine - target_rank_id = target_local_rank_id + machine_id * nodes_per_machine - send_rank = target_rank_id - - reverse_inner_dist = 2 ** (index % (exp_2_in_size + 1)) - reverse_dist_to_out = ( - local_rank_id - local_rank_to_go_outside_id - ) % nodes_per_machine - if reverse_inner_dist >= reverse_dist_to_out: - reverse_inner_dist += 1 - - # find recv_rank - source_local_rank_id = ( - local_rank_id - reverse_inner_dist - ) % nodes_per_machine - source_rank_id = source_local_rank_id + machine_id * nodes_per_machine - recv_rank = source_rank_id - - yield [send_rank], [recv_rank] - index += 1 - - -if __name__ == "__main__": - topo = ExponentialGraph(10) - gen = GetDynamicOnePeerSendRecvRanks(topo, 4) - for _ in range(10): - print(next(gen)) diff --git a/bluefoglite/common/torch_backend.py b/bluefoglite/common/torch_backend.py index 17a5e6f..15062f9 100644 --- a/bluefoglite/common/torch_backend.py +++ b/bluefoglite/common/torch_backend.py @@ -18,7 +18,7 @@ import functools from enum import Enum import os -from typing import Any, Dict, List, Tuple, Optional, Union, Callable +from typing import Any, Dict, List, Optional, Union, Callable import networkx as nx import torch @@ -105,7 +105,6 @@ def init( ) self._rank = int(_world_rank_env) if rank is None else rank self._size = int(_world_size_env) if size is None else size - self._backend = "undefined" if backend is None else backend dist.init_process_group( backend=backend, @@ -168,9 +167,6 @@ def set_topology(self, topology: nx.DiGraph) -> bool: ) return True - def load_topology(self, group=None): - return self._topology_and_weights.topology - def isend(self, tensor: torch.Tensor, dst: int, tag: int = 0) -> AsyncWork: self._check_rank(dst) return AsyncWork(work=self.process_group.send([tensor], dstRank=dst, tag=tag)) @@ -213,58 +209,31 @@ def neighbor_allreduce_nonblocking( "Must provide all self_weight, src_weights, and dst_weights " "arguments or set static topology." ) - + tmp_recv_tensors = {i: torch.zeros_like(tensor) for i, _ in src_weights.items()} op_list = [] for dst, weight in dst_weights.items(): - comm_tensor = ( - tensor.mul(weight).to("cpu") - if self._backend == "gloo" and tensor.device.type != "cpu" - else tensor.mul(weight) - ) op_list.append( dist.P2POp( - dist.isend, - comm_tensor, - peer=dst, - group=self.process_group, + dist.isend, tensor.mul(weight), peer=dst, group=self.process_group ) ) - src_weights_items = list(src_weights.items()) - tmp_recv_tensors_concat = torch.zeros( - len(src_weights_items), - *tensor.shape, - device=( - "cpu" - if self._backend == "gloo" and tensor.device.type != "cpu" - else tensor.device - ), - ) - for idx, (src, _) in enumerate(src_weights_items): + for src, tmp_tensor in tmp_recv_tensors.items(): op_list.append( - dist.P2POp( - dist.irecv, - tmp_recv_tensors_concat[idx], - peer=src, - group=self.process_group, - ) + dist.P2POp(dist.irecv, tmp_tensor, peer=src, group=self.process_group) ) - reqs = dist.batch_isend_irecv(op_list) def post_func( tensor: torch.Tensor, - tmp_recv_tensors_concat: torch.Tensor, + tmp_recv_tensors: Dict[int, torch.Tensor], self_weight: float, - src_weights_items: List[Tuple[int, float]], + src_weights: Dict[int, float], ) -> torch.Tensor: tensor_ = tensor if inplace else tensor.detach().clone() tensor_.mul_(self_weight) - # move tmp_recv_tensors_concat from cpu to cuda. - if self._backend == "gloo" and tensor.device.type != "cpu": - tmp_recv_tensors_concat = tmp_recv_tensors_concat.to(tensor_.device) - for idx, (_, weight) in enumerate(src_weights_items): - tensor_.add_(tmp_recv_tensors_concat[idx], alpha=weight) - del tmp_recv_tensors_concat + for src, weight in src_weights.items(): + tensor_.add_(tmp_recv_tensors[src].mul_(weight)) + del tmp_recv_tensors return tensor_ return AsyncWork( @@ -272,9 +241,9 @@ def post_func( functools.partial( post_func, tensor=tensor, - tmp_recv_tensors_concat=tmp_recv_tensors_concat, + tmp_recv_tensors=tmp_recv_tensors, self_weight=self_weight, - src_weights_items=src_weights_items, + src_weights=src_weights, ), ) @@ -328,7 +297,7 @@ def allreduce_nonblocking( inplace: bool = True, ) -> AsyncWork: opts = dist.AllreduceOptions() - opts.reduceOp = op.value[0] if op != ReduceOp.AVG else dist.ReduceOp.SUM + opts.reduceOp = op.value if op != ReduceOp.AVG else dist.ReduceOp.SUM _tensor = tensor if inplace else tensor.detach().clone() def post_func(tensor: torch.Tensor, op: ReduceOp, size: int) -> torch.Tensor: diff --git a/bluefoglite/launch/run.py b/bluefoglite/launch/run.py index 5cc4b69..cded2b8 100644 --- a/bluefoglite/launch/run.py +++ b/bluefoglite/launch/run.py @@ -46,24 +46,6 @@ def parse_args(): help="Total number of training processes.", ) - parser.add_argument( - "--master-port", - action="store", - dest="master_port", - type=int, - default=29500, - help="Master port for BluefogLite.", - ) - - parser.add_argument( - "--master-addr", - action="store", - dest="master_addr", - type=str, - default="127.0.0.1", - help="Master addr for BluefogLite.", - ) - parser.add_argument( "command", nargs=argparse.REMAINDER, help="Command to be executed." ) @@ -117,8 +99,8 @@ def main(): env["BFL_WORLD_SIZE"] = str(args.np) env["BFL_FILE_STORE"] = shared_file_dir # TODO fix this - env["MASTER_ADDR"] = args.master_addr - env["MASTER_PORT"] = str(args.master_port) + env["MASTER_ADDR"] = "127.0.0.1" + env["MASTER_PORT"] = "29500" stdout = None stderr = subprocess.STDOUT diff --git a/bluefoglite/torch_api.py b/bluefoglite/torch_api.py index 4c6a7e1..75ac115 100644 --- a/bluefoglite/torch_api.py +++ b/bluefoglite/torch_api.py @@ -20,15 +20,6 @@ import torch.distributed as dist from bluefoglite.common.torch_backend import AsyncWork, BlueFogLiteGroup, ReduceOp -from bluefoglite.common.optimizers import ( - DistributedAdaptWithCombineOptimizer, - CommunicationType, -) -from bluefoglite.utility import ( - neighbor_allreduce_parameters, - broadcast_parameters, - broadcast_optimizer_state, -) _global_group = BlueFogLiteGroup() @@ -50,8 +41,6 @@ "allreduce_nonblocking", "allreduce", ] - - # import basic methods and wrap it with default global group. @@ -109,12 +98,6 @@ def set_topology(topology: nx.DiGraph, *, group=None): return group.set_topology(topology=topology) -def load_topology(group=None): - if group is None: - group = _global_group - return group.load_topology(group=None) - - def neighbor_allreduce( tensor: torch.Tensor, *, @@ -195,8 +178,3 @@ def allreduce_nonblocking( if group is None: group = _global_group return group.allreduce_nonblocking(tensor=tensor, op=op, inplace=inplace) - - -def barrier(device: str = "cpu") -> None: - # If backend is nccl, device="cuda" or f"cuda:{i}" should be passed in. - allreduce(torch.tensor([1.0], device=device)) diff --git a/bluefoglite/utility.py b/bluefoglite/utility.py deleted file mode 100644 index 73142f6..0000000 --- a/bluefoglite/utility.py +++ /dev/null @@ -1,181 +0,0 @@ -import torch -import bluefoglite.torch_api as bfl -import collections - - -def broadcast_parameters(params, root_rank): - """ - Broadcasts the parameters from root rank to all other processes. - Typical usage is to broadcast the ``model.state_dict()``, - ``model.named_parameters()``, or ``model.parameters()``. - - Arguments: - params: One of the following: - - list of parameters to broadcast - - dict of parameters to broadcast - root_rank: The rank of the process from which parameters will be - broadcasted to all other processes. - """ - if isinstance(params, dict): - params = sorted(params.items()) - elif isinstance(params, list): - # support both named_parameters() and regular parameters() - params = [p if isinstance(p, tuple) else (None, p) for p in params] - else: - raise ValueError("invalid params of type: %s" % type(params)) - - # Run asynchronous broadcasts. - async_works = [] - for name, p in params: - async_work = bfl.broadcast_nonblocking(p, inplace=True, root_rank=root_rank) - async_works.append(async_work) - - # Wait for completion. - for async_work in async_works: - async_work.wait() - - -# TODO[ccy]: only broadcast named_parameters version -def neighbor_allreduce_parameters(params): - if isinstance(params, dict): - params = sorted(params.items()) - elif isinstance(params, list): - # support both named_parameters() and regular parameters() - params = [p if isinstance(p, tuple) else (None, p) for p in params] - else: - raise ValueError("invalid params of type: %s" % type(params)) - - # Run asynchronous broadcasts. - async_works = [] - for name, p in params: - if torch.is_floating_point(p): - async_work = bfl.neighbor_allreduce_nonblocking(p, inplace=True) - async_works.append(async_work) - - # Wait for completion. - for async_work in async_works: - async_work.wait() - - -def broadcast_optimizer_state(optimizer, root_rank, device): - if isinstance(optimizer, torch.optim.LBFGS): - raise ValueError("cannot broadcast torch.optim.LBFGS state") - - state_dict = optimizer.state_dict() - - # Newly created optimizers will not have their state initialized, so - # do that initialization here - if not state_dict["state"]: - for group in optimizer.param_groups: - for p in group["params"]: - p.grad = p.data.new(p.size()).zero_() - # This function accepts a torch.optim.Optimizer or a DistributedOptimizer - # wrapped around a torch optimizer. Calling step() with a DistributedOptimizer - # forces allreduce on all model parameters, which will result in deadlock - # unless every rank calls step(). Therefore, to finish state initialization - # only call optimizer.step() with a torch.optim.Optimizer. - if optimizer.__module__ == bfl.DistributedAdaptWithCombineOptimizer.__module__: - super(optimizer.__class__, optimizer).step() - else: - optimizer.step() - state_dict = optimizer.state_dict() - - # If the state_dict is still empty after initialization, then - # the optimizer is stateless, and there is nothing to broadcast. - # Furthermore, attempting to access the state dict would result in - # an error. - if not state_dict["state"]: - return - - params = [] - callbacks = {} - occurrences = collections.defaultdict(int) - - # Returns the full type structure of the possibly nested objects for recursive casting back - def _get_types(x): - if isinstance(x, collections.Iterable): - return type(x), [_get_types(xi) for xi in x] - else: - return type(x) - - # Casts an object encoded in a tensor back into its original type and subtypes - def _recursive_cast(x, dtype): - if isinstance(dtype, tuple): - t, dtypes = dtype - x = t(x) - return t([_recursive_cast(x[i], dtypes[i]) for i in range(len(x))]) - else: - return dtype(x) - - # Some optimizer parameters may be represented as scalars instead of - # tensors. In such cases, we need to wrap the scalar in a tensor, then - # broadcast, then update the appropriate value in the state_dict with the - # new unwrapped scalar value via a callback. - def _create_callback(pid, name, t, p): - def _from_tensor(): - state_dict["state"][pid][name] = ( - t(p.cpu().numpy()[0]) if device != "cpu" else t(p.numpy()[0]) - ) - - return _from_tensor - - def _create_option_callback(index, option_key, option_tensor, dtypes): - def _from_tensor(): - optimizer.param_groups[index][option_key] = _recursive_cast( - option_tensor.cpu().numpy()[0] - if device != "cpu" - else option_tensor.numpy()[0], - dtypes, - ) - - return _from_tensor - - # Param groups are an ordered list, normally there is only one per model, - # but users can add additional param groups for example to train - # previously frozen layers - for index, group in enumerate(state_dict["param_groups"]): - # Broadcast options like learning rate - for option_key, option_value in group.items(): - # TODO: what if option_value is None? - if option_value is None: - continue - if option_key == "params": - continue - # Options like the learning rate are scalar, and need to be wrapped in tensors - key = "%s.%d" % (option_key, index) - dtypes = _get_types(option_value) - option_tensor = torch.tensor([option_value], device=device) - callbacks[key] = _create_option_callback( - index, option_key, option_tensor, dtypes - ) - params.append((key, option_tensor)) - - # The params list here is ordered by the layers in the model - for pid in group["params"]: - param_state = state_dict["state"][pid] - for name, p in param_state.items(): - # Some parameter names may appear more than once, in which - # case we ensure they have a unique identifier defined by - # their order - # TODO: what if option_value is None? - if p is None: - continue - occurrences[name] += 1 - key = "%s.%d" % (str(name), occurrences[name]) - - if not torch.is_tensor(p): - # Wrap the scalar in a FloatTensor, and remember its type - # so we can cast it back after unwrapping - t = type(p) - p = torch.tensor([p], device=device) - callbacks[key] = _create_callback(pid, name, t, p) - - params.append((key, p)) - - # Synchronized broadcast of all parameters - broadcast_parameters(params, root_rank) - - # Post-broadcast clenaup for non-tensor parameters - for key, p in params: - if key in callbacks: - callbacks[key]() diff --git a/example/torch_allreduce.py b/example/torch_allreduce.py deleted file mode 100644 index abbbce3..0000000 --- a/example/torch_allreduce.py +++ /dev/null @@ -1,13 +0,0 @@ -import torch -import bluefoglite -import bluefoglite.torch_api as bfl - -bfl.init() -bfl.set_topology(topology=bluefoglite.RingGraph(bfl.size())) -print(f"I am rank {bfl.rank()} among size {bfl.size()}.") - -tensor = torch.zeros(2) + bfl.rank() -print("Rank ", bfl.rank(), " has data ", tensor) - -bfl.allreduce(tensor, inplace=True) -print("Rank ", bfl.rank(), " has data ", tensor, " and output ", tensor) diff --git a/example/torch_avg.py b/example/torch_avg.py deleted file mode 100644 index 60ca170..0000000 --- a/example/torch_avg.py +++ /dev/null @@ -1,43 +0,0 @@ -import torch -import bluefoglite -import bluefoglite.torch_api as bfl - -import argparse -from bluefoglite.common.torch_backend import AsyncWork, BlueFogLiteGroup, ReduceOp - -parser = argparse.ArgumentParser( - description="Bluefog-Lite Average Example", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, -) -parser.add_argument("--data-size", type=int, default=4) -parser.add_argument("--max-iters", type=int, default=200) -parser.add_argument("--plot-interactive", action="store_true") -parser.add_argument("--backend", type=str, default="gloo") -parser.add_argument("--consensus-method", type=str, default="neighbor_allreduce") -args = parser.parse_args() - -# choices: gloo, mpi, nccl -bfl.init(backend="gloo") -bfl.set_topology(bluefoglite.RingGraph(bfl.size(), connect_style=0)) - -device = bfl.rank() % torch.cuda.device_count() -x = torch.randn(args.data_size, device=device, dtype=torch.double) -x_bar = bfl.allreduce(x, op=ReduceOp.AVG) -mse = [torch.norm(x - x_bar, p=2) / torch.norm(x_bar, p=2)] - - -for ite in range(args.max_iters): - x = eval(f"bfl.{args.consensus_method}(x, inplace=False)") - mse.append(torch.norm(x - x_bar, p=2) / torch.norm(x_bar, p=2)) - - -mse = [m.item() for m in mse] -print("MSE at last iteration: ", mse[-1]) -if args.plot_interactive and bfl.rank() == 0: - import matplotlib.pyplot as plt - - plt.semilogy(mse) - plt.savefig(f"./img/torch_avg_{args.consensus_method}.png") - plt.show() - plt.close() -bfl.shutdown() diff --git a/example/torch_broadcast.py b/example/torch_broadcast.py deleted file mode 100644 index 14a02e9..0000000 --- a/example/torch_broadcast.py +++ /dev/null @@ -1,14 +0,0 @@ -import torch -import torch.distributed as dist -import bluefoglite -import bluefoglite.torch_api as bfl - -bfl.init() -bfl.set_topology(topology=bluefoglite.RingGraph(bfl.size())) -print(f"I am rank {bfl.rank()} among size {bfl.size()}.") - -tensor = torch.zeros(2) + bfl.rank() -print("Rank ", bfl.rank(), " has data ", tensor) - -bfl.broadcast(tensor, root_rank=0, inplace=True) -print("Rank ", bfl.rank(), " has data ", tensor, " and output ", tensor) diff --git a/example/torch_cifar10.py b/example/torch_cifar10.py deleted file mode 100644 index 8131a47..0000000 --- a/example/torch_cifar10.py +++ /dev/null @@ -1,326 +0,0 @@ -import argparse -import os - -from torchvision import datasets, transforms -import torch.utils.data.distributed -import torch.nn.functional as F - -import bluefoglite.torch_api as bfl -from bluefoglite.common import topology -from model import ResNet20, ResNet32, ResNet44, ResNet56, ViT - -# Args -parser = argparse.ArgumentParser( - description="Bluefog-Lite Example on MNIST", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, -) -parser.add_argument("--model", type=str, default="resnet20", help="model to use") -parser.add_argument( - "--batch-size", type=int, default=64, help="input batch size for training" -) -parser.add_argument( - "--test-batch-size", type=int, default=64, help="input batch size for testing" -) -parser.add_argument("--epochs", type=int, default=5, help="number of epochs to train") -parser.add_argument("--lr", type=float, default=0.01, help="learning rate") -parser.add_argument("--momentum", type=float, default=0.9, help="momentum") -parser.add_argument( - "--dist-optimizer", - type=str, - default="neighbor_allreduce", - help="The type of distributed optimizer. Supporting options are [neighbor_allreduce, allreduce]", - choices=["neighbor_allreduce", "allreduce"], -) -parser.add_argument( - "--communicate-state-dict", - action="store_true", - default=False, - help="If True, communicate state dictionary of model instead of named parameters", -) -parser.add_argument( - "--log-interval", - type=int, - default=100, - help="how many batches to wait before logging training status", -) -parser.add_argument( - "--no_cuda", action="store_true", default=False, help="disables CUDA training" -) - -parser.add_argument( - "--seed", type=int, default=42, metavar="S", help="random seed (default: 42)" -) -parser.add_argument( - "--backend", - type=str, - default="gloo", - choices=["gloo", "nccl"], -) -parser.add_argument( - "--profiling", - type=str, - default="no_profiling", - metavar="S", - help="enable which profiling? default: no", - choices=["no_profiling", "c_profiling", "torch_profiling"], -) -parser.add_argument( - "--disable-dynamic-topology", - action="store_true", - default=False, - help="Disable each iteration to transmit one neighbor per iteration dynamically.", -) - -args = parser.parse_args() -args.cuda = not args.no_cuda and torch.cuda.is_available() - -# Initialize topology -bfl.init(backend=args.backend) -topo = topology.RingGraph(bfl.size()) -bfl.set_topology(topo) -if not args.disable_dynamic_topology: - dynamic_neighbor_allreduce_gen = topology.GetDynamicOnePeerSendRecvRanks( - bfl.load_topology(), bfl.rank() - ) - -# Device -if args.cuda: - print("using cuda.") - device_id = bfl.rank() % torch.cuda.device_count() - torch.cuda.set_device(device_id) - torch.cuda.manual_seed(args.seed) - device = torch.tensor([0.0]).cuda().device -else: - print("using cpu") - torch.manual_seed(args.seed) - device = "cpu" - -# Dataloader -kwargs = {} -data_folder_loc = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..") -transform_train = transforms.Compose( - [ - transforms.RandomCrop(32, padding=4), - transforms.RandomHorizontalFlip(), - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ] -) -transform_test = transforms.Compose( - [ - transforms.ToTensor(), - transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), - ] -) -train_dataset = datasets.CIFAR10( - root="./data", train=True, download=True, transform=transform_train -) -train_sampler = torch.utils.data.distributed.DistributedSampler( - train_dataset, num_replicas=bfl.size(), rank=bfl.rank(), seed=args.seed -) -train_loader = torch.utils.data.DataLoader( - train_dataset, - batch_size=args.batch_size, - sampler=train_sampler, - **kwargs, -) -test_dataset = datasets.CIFAR10( - root="./data", train=False, download=True, transform=transform_test -) -test_sampler = torch.utils.data.distributed.DistributedSampler( - test_dataset, num_replicas=bfl.size(), rank=bfl.rank(), seed=args.seed -) -test_loader = torch.utils.data.DataLoader( - test_dataset, - batch_size=args.test_batch_size, - sampler=test_sampler, - **kwargs, -) - -# model -if args.model == "resnet20": - model = ResNet20() -elif args.model == "resnet32": - model = ResNet32() -elif args.model == "resnet44": - model = ResNet44() -elif args.model == "resnet56": - model = ResNet56() -elif args.model == "vit_tiny": - model = ViT() -else: - raise NotImplementedError("model not implemented") -if args.cuda: - model.cuda() - -# Optimizer & Scheduler -optimizer = torch.optim.SGD( - model.parameters(), lr=args.lr, momentum=args.momentum, weight_decay=5e-4 -) -base_dist_optimizer = bfl.DistributedAdaptWithCombineOptimizer -if args.dist_optimizer == "allreduce": - optimizer = base_dist_optimizer( - optimizer, model=model, communication_type=bfl.CommunicationType.allreduce - ) -elif args.dist_optimizer == "neighbor_allreduce": - optimizer = base_dist_optimizer( - optimizer, - model=model, - communication_type=bfl.CommunicationType.neighbor_allreduce, - ) -else: - raise ValueError( - "Unknown args.dist-optimizer type -- " - + args.dist_optimizer - + "\n" - + "Please set the argument to be one of " - + "[neighbor_allreduce, gradient_allreduce, allreduce, " - + "hierarchical_neighbor_allreduce, win_put, horovod]" - ) -scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=args.epochs) - -# Broadcast parameters & optimizer state -bfl.broadcast_parameters(model.state_dict(), root_rank=0) -bfl.broadcast_optimizer_state( - optimizer, root_rank=0, device=next(model.parameters()).device -) - - -def dynamic_topology_update(epoch, batch_idx): - if args.dist_optimizer == "neighbor_allreduce": - send_neighbors, recv_neighbors = next(dynamic_neighbor_allreduce_gen) - assert len(send_neighbors) == len(recv_neighbors) - optimizer.dst_weights = { - r: 1 / (len(send_neighbors) + 1) for r in send_neighbors - } - optimizer.src_weights = { - r: 1 / (len(recv_neighbors) + 1) for r in recv_neighbors - } - optimizer.self_weight = 1 / (len(recv_neighbors) + 1) - else: - pass - - -def metric_average(val): - tensor = torch.tensor(val, device=device) - avg_tensor = bfl.allreduce(tensor) - return avg_tensor.item() - - -def train(epoch): - model.train() - train_loss, correct, total = 0, 0, 0 - for batch_idx, (data, targets) in enumerate(train_loader): - if not args.disable_dynamic_topology: - dynamic_topology_update(epoch, batch_idx) - if args.cuda: - data, targets = data.cuda(), targets.cuda() - optimizer.zero_grad() - outputs = model(data) - loss = F.cross_entropy(outputs, targets) - loss.backward() - # TODO[1]: Implement unit test to check whether params in different workers are same after allreduce/neighbor_allreduce - optimizer.step() - # Calculate metric - train_loss += loss.item() - _, pred = outputs.max(dim=1) - total += targets.size(dim=0) - correct += pred.eq(targets).sum().item() - - if (batch_idx + 1) % args.log_interval == 0: - print( - "[{}] Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\t".format( - bfl.rank(), - epoch, - total, - len(train_sampler), - 100.0 * total / len(train_sampler), - train_loss / (batch_idx + 1), - ) - ) - train_accuracy = correct / total - # Bluefog: average metric values across workers. - train_loss = metric_average(train_loss) - train_accuracy = metric_average(train_accuracy) - if bfl.rank() == 0: - print( - "\nTrain Epoch: {}\tAverage loss: {:.6f}\tAccuracy: {:.4f}%\n".format( - epoch, train_loss / len(train_loader), 100.0 * train_accuracy - ), - flush=True, - ) - - -def test(epoch): - model.eval() - test_loss, correct, total = 0, 0, 0 - for data, targets in test_loader: - if args.cuda: - data, targets = data.cuda(), targets.cuda() - outputs = model(data) - loss = F.cross_entropy(outputs, targets) - - test_loss += loss.item() - _, pred = outputs.max(dim=1) - total += targets.size(dim=0) - correct += pred.eq(targets).sum().item() - - test_accuracy = correct / total - # Bluefog: average metric values across workers. - test_loss = metric_average(test_loss) - test_accuracy = metric_average(test_accuracy) - if bfl.rank() == 0: - print( - "\nTest Epoch: {}\tAverage loss: {:.6f}\tAccuracy: {:.4f}%\n".format( - epoch, test_loss / len(test_loader), 100.0 * test_accuracy - ), - flush=True, - ) - - -if args.profiling == "c_profiling": - if bfl.rank() == 0: - import cProfile - import pstats - - profiler = cProfile.Profile() - profiler.enable() - train(0) - profiler.disable() - # redirect to ./output_static.txt or ./output_dynamic.txt - with open( - f"output_{'static' if args.disable_dynamic_topology else 'dynamic'}.txt", - "w", - ) as file: - stats = pstats.Stats(profiler, stream=file).sort_stats("tottime") - stats.print_stats() - else: - train(0) -elif args.profiling == "torch_profiling": - from torch.profiler import profile, ProfilerActivity - import contextlib - - assert args.backend != "nccl", "NCCL backend does not support torch_profiling." - - if bfl.rank() == 0: - with profile( - activities=[ProfilerActivity.CUDA, ProfilerActivity.CPU], record_shapes=True - ) as prof: - train(0) - # redirect to ./output_static.txt or ./output_dynamic.txt - with open( - f"output_{'static' if args.disable_dynamic_topology else 'dynamic'}.txt", - "w", - ) as file: - with contextlib.redirect_stdout(file): - print(prof.key_averages().table(sort_by="cpu_time_total")) - else: - train(0) -else: - for e in range(args.epochs): - train(e) - test(e) - scheduler.step() - -bfl.barrier(device=device) -print(f"rank {bfl.rank()} finished.") diff --git a/example/torch_mnist.py b/example/torch_mnist.py deleted file mode 100644 index 2089751..0000000 --- a/example/torch_mnist.py +++ /dev/null @@ -1,170 +0,0 @@ -import argparse -import copy -import os -import tqdm -import numpy as np - -from torchvision import datasets, transforms -import torch.utils.data.distributed -import torch.nn.functional as F - -import bluefoglite.torch_api as bfl -import bluefoglite.utility as bfl_util -from bluefoglite.common import topology -from bluefoglite.common.torch_backend import ReduceOp -from model import MLP - -# Args -parser = argparse.ArgumentParser( - description="Bluefog-Lite Example on MNIST", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, -) -parser.add_argument( - "--batch_size", type=int, default=16, help="input batch size for training" -) -parser.add_argument( - "--test_batch_size", type=int, default=16, help="input batch size for testing" -) -parser.add_argument("--epochs", type=int, default=5, help="number of epochs to train") -parser.add_argument("--lr", type=float, default=0.001, help="learning rate") -parser.add_argument( - "--log_interval", - type=int, - default=100, - help="how many batches to wait before logging training status", -) -parser.add_argument( - "--no_cuda", action="store_true", default=False, help="disables CUDA training" -) -parser.add_argument( - "--seed", type=int, default=42, metavar="S", help="random seed (default: 42)" -) - -args = parser.parse_args() -args.cuda = not args.no_cuda and torch.cuda.is_available() - -# Initialize topology -bfl.init() -topo = topology.RingGraph(bfl.size()) -bfl.set_topology(topo) - -# Device -if args.cuda: - print("using cuda.") - device_id = bfl.rank() % torch.cuda.device_count() - torch.cuda.set_device(device_id) - torch.cuda.manual_seed(args.seed) -else: - print("using cpu") - torch.manual_seed(args.seed) - -# Dataloader -kwargs = {} -data_folder_loc = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..") - -train_dataset = datasets.MNIST( - os.path.join(data_folder_loc, "data", "data-%d" % bfl.rank()), - train=True, - download=True, - transform=transforms.Compose( - [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))] - ), -) -train_sampler = torch.utils.data.distributed.DistributedSampler( - train_dataset, num_replicas=bfl.size(), rank=bfl.rank() -) -train_loader = torch.utils.data.DataLoader( - train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs -) - -test_dataset = datasets.MNIST( - os.path.join(data_folder_loc, "data", "data-%d" % bfl.rank()), - train=False, - transform=transforms.Compose( - [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))] - ), -) -test_sampler = torch.utils.data.distributed.DistributedSampler( - test_dataset, num_replicas=bfl.size(), rank=bfl.rank() -) -test_loader = torch.utils.data.DataLoader( - test_dataset, batch_size=args.test_batch_size, sampler=test_sampler, **kwargs -) - -# model -model = MLP() -if args.cuda: - model.cuda() - - -# Optimizer -optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9) - -# Broadcast parameters & optimizer state -bfl_util.broadcast_parameters(model.state_dict(), root_rank=0) - - -def metric_average(val): - tensor = torch.tensor(val) - avg_tensor = bfl.allreduce(tensor) - return avg_tensor.item() - - -def train(epoch): - model.train() - for batch_idx, (data, target) in enumerate(train_loader): - optimizer.zero_grad() - if args.cuda: - data, target = data.cuda(), target.cuda() - output = model(data) - loss = F.cross_entropy(output, target) - loss.backward() - optimizer.step() - with torch.no_grad(): - # TODO[1]: Implement unit test to check whether params in different workers are same after allreduce - # TODO[2]: Write a function to sychronize the parameters in different workers - for module in model.parameters(): - bfl.allreduce(module.data, op=ReduceOp.AVG, inplace=True) - if batch_idx % args.log_interval == 0: - print( - "[{}] Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\t".format( - bfl.rank(), - epoch, - batch_idx * len(data), - len(train_sampler), - 100.0 * batch_idx / len(train_loader), - loss.item(), - ) - ) - - -def test(epoch): - model.eval() - test_loss, test_accuracy, total = 0.0, 0.0, 0.0 - for data, target in test_loader: - if args.cuda: - data, target = data.cuda(), target.cuda() - output = model(data) - pred = output.data.max(1, keepdim=True)[1] - test_accuracy += pred.eq(target.data.view_as(pred)).cpu().float().sum().item() - test_loss += F.cross_entropy(output, target, reduction="sum").item() - total += len(target) - test_loss /= total - test_accuracy /= total - # Bluefog: average metric values across workers. - test_loss = metric_average(test_loss) - test_accuracy = metric_average(test_accuracy) - if bfl.rank() == 0: - print( - "\nTest Epoch: {}\tAverage loss: {:.6f}\tAccuracy: {:.4f}%\n".format( - epoch, test_loss, 100.0 * test_accuracy - ), - flush=True, - ) - - -for e in range(args.epochs): - train(e) - test(e) -bfl.barrier() -print(f"rank {bfl.rank()} finished.") diff --git a/example/torch_neighbor_allreduce.py b/example/torch_neighbor_allreduce.py deleted file mode 100644 index a2d882b..0000000 --- a/example/torch_neighbor_allreduce.py +++ /dev/null @@ -1,21 +0,0 @@ -import torch -import bluefoglite -import bluefoglite.torch_api as bfl - -bfl.init() -bfl.set_topology(topology=bluefoglite.RingGraph(bfl.size())) -print(f"I am rank {bfl.rank()} among size {bfl.size()}.") - -x = torch.nn.Parameter( - torch.arange(bfl.rank(), bfl.rank() + 4).float(), requires_grad=True -) -y = torch.dot(x, x) -y.backward() - -print("Rank ", bfl.rank(), " x.data: ", x.data) -print("Rank ", bfl.rank(), " x.grad: ", x.grad) - -bfl.neighbor_allreduce(x.data, inplace=True) -bfl.neighbor_allreduce(x.grad, inplace=True) -print("Rank ", bfl.rank(), " x.data: ", x.data) -print("Rank ", bfl.rank(), " x.grad: ", x.grad) diff --git a/model/__init__.py b/model/__init__.py deleted file mode 100644 index 4c8a52e..0000000 --- a/model/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .mlp import MLP -from .resnet import ResNet20, ResNet32, ResNet44, ResNet56 -from .vit import ViT diff --git a/model/mlp.py b/model/mlp.py deleted file mode 100644 index 1d7b106..0000000 --- a/model/mlp.py +++ /dev/null @@ -1,21 +0,0 @@ -import torch -import torch.nn as nn - - -def MLP(): - return nn.Sequential( - nn.Flatten(), - nn.Linear(784, 256), - nn.ReLU(), - nn.Linear(256, 10), - ) - - -def test(): - net = MLP() - y = net(torch.randn(16, 1, 28, 28)) - print(y.size()) - - -if __name__ == "__main__": - print("main") diff --git a/model/resnet.py b/model/resnet.py deleted file mode 100644 index 8def732..0000000 --- a/model/resnet.py +++ /dev/null @@ -1,116 +0,0 @@ -import torch -import torch.nn as nn - - -def conv3x3(in_channels, out_channels, stride=1) -> nn.Conv2d: - return nn.Conv2d( - in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False - ) - - -class BasicBlock(nn.Module): - expansion = 1 - - def __init__(self, in_channels, out_channels, stride=1, downsample=None): - super(BasicBlock, self).__init__() - self.conv1 = conv3x3(in_channels, out_channels, stride) - self.bn1 = nn.BatchNorm2d(out_channels) - self.relu = nn.ReLU(inplace=True) - self.conv2 = conv3x3(out_channels, out_channels) - self.bn2 = nn.BatchNorm2d(out_channels) - self.downsample = downsample - - def forward(self, x): - identity = x - - out = self.conv1(x) - out = self.bn1(out) - out = self.relu(out) - - out = self.conv2(out) - out = self.bn2(out) - - if self.downsample is not None: - identity = self.downsample(x) - - # out += identity - out = out.expand_as(identity) + identity - self.relu(out) - - return out - - -class ResNet(nn.Module): - def __init__(self, block, layers, num_classes=10): - super(ResNet, self).__init__() - self.in_channels = 16 - self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False) - self.bn1 = nn.BatchNorm2d(16) - self.relu = nn.ReLU(inplace=True) - self.layer1 = self._make_layer(block, 16, layers[0]) - self.layer2 = self._make_layer(block, 32, layers[1], stride=2) - self.layer3 = self._make_layer(block, 64, layers[2], stride=2) - self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) - self.fc = nn.Linear(64 * block.expansion, num_classes) - - def _make_layer(self, block, out_channels, blocks, stride=1): - downsample = None - if stride != 1 or self.in_channels != out_channels * block.expansion: - downsample = nn.Sequential( - conv3x3(self.in_channels, out_channels * block.expansion, stride), - nn.BatchNorm2d(out_channels * block.expansion), - ) - - layers = [] - layers.append(block(self.in_channels, out_channels, stride, downsample)) - self.in_channels = out_channels * block.expansion - for _ in range(1, blocks): - layers.append(block(self.in_channels, out_channels)) - - return nn.Sequential(*layers) - - def forward(self, x): - x = self.conv1(x) - x = self.bn1(x) - x = self.relu(x) - - x = self.layer1(x) - x = self.layer2(x) - x = self.layer3(x) - - x = self.avgpool(x) - x = torch.flatten(x, 1) - x = self.fc(x) - - return x - - -def ResNet20(): - return ResNet(BasicBlock, [3, 3, 3]) - - -def ResNet32(): - # 32 = (3 * 5 * 2) + 2: 5 blocks per layer - return ResNet(BasicBlock, [5, 5, 5]) - - -def ResNet44(): - # 44 = (3 * 7 * 2) + 2: 7 blocks per layer - return ResNet(BasicBlock, [7, 7, 7]) - - -def ResNet56(): - # 56 = (3 * 9 * 2) + 2: 9 blocks per layer - return ResNet(BasicBlock, [9, 9, 9]) - - -if __name__ == "__main__": - model = ResNet20() - for name in model.state_dict(): - print(name) - print() - for p in model.named_parameters(): - print(p[0]) - print(len(model.state_dict())) - print(len(list(model.parameters()))) - print(len(list(model.named_parameters()))) diff --git a/model/vit.py b/model/vit.py deleted file mode 100644 index 48a9457..0000000 --- a/model/vit.py +++ /dev/null @@ -1,145 +0,0 @@ -import torch -import torch.nn as nn -import torch.nn.functional as F -from torch.profiler import profile, record_function, ProfilerActivity - - -class ViT(nn.Module): - def __init__( - self, - in_channels: int = 3, - num_classes: int = 10, - img_size: int = 32, - patch: int = 8, - dropout: float = 0.0, - num_layers: int = 7, - hidden: int = 384, - mlp_hidden: int = 384, - head: int = 8, - is_cls_token: bool = True, - ): - super(ViT, self).__init__() - self.patch = patch - self.is_cls_token = is_cls_token - self.patch_size = img_size // self.patch - f = (img_size // self.patch) ** 2 * 3 # 48 # patch vec length - num_tokens = (self.patch**2) + 1 if self.is_cls_token else (self.patch**2) - - self.emb = nn.Linear(f, hidden) # (b, n, f) - self.cls_token = ( - nn.Parameter(torch.randn(1, 1, hidden)) if is_cls_token else None - ) - self.pos_emb = nn.Parameter(torch.randn(1, num_tokens, hidden)) - enc_list = [ - TransformerEncoder( - hidden, mlp_hidden=mlp_hidden, dropout=dropout, head=head - ) - for _ in range(num_layers) - ] - self.enc = nn.Sequential(*enc_list) - self.fc = nn.Sequential( - nn.LayerNorm(hidden), nn.Linear(hidden, num_classes) # for cls_token - ) - - def forward(self, x): - out = self._to_words(x) - out = self.emb(out) - if self.is_cls_token: - out = torch.cat([self.cls_token.repeat(out.size(0), 1, 1), out], dim=1) - out = out + self.pos_emb - out = self.enc(out) - if self.is_cls_token: - out = out[:, 0] - else: - out = out.mean(1) - out = self.fc(out) - return out - - def _to_words(self, x): - """ - (b, c, h, w) -> (b, n, f) - """ - out = ( - x.unfold(2, self.patch_size, self.patch_size) - .unfold(3, self.patch_size, self.patch_size) - .permute(0, 2, 3, 4, 5, 1) - ) - out = out.reshape(x.size(0), self.patch**2, -1) - return out - - -class TransformerEncoder(nn.Module): - def __init__( - self, feats: int, mlp_hidden: int, head: int = 8, dropout: float = 0.0 - ): - super(TransformerEncoder, self).__init__() - self.la1 = nn.LayerNorm(feats) - self.msa = MultiHeadSelfAttention(feats, head=head, dropout=dropout) - self.la2 = nn.LayerNorm(feats) - self.mlp = nn.Sequential( - nn.Linear(feats, mlp_hidden), - nn.GELU(), - nn.Dropout(dropout), - nn.Linear(mlp_hidden, feats), - nn.GELU(), - nn.Dropout(dropout), - ) - - def forward(self, x): - out = self.msa(self.la1(x)) + x - out = self.mlp(self.la2(out)) + out - return out - - -class MultiHeadSelfAttention(nn.Module): - def __init__(self, feats: int, head: int = 8, dropout: float = 0.0): - super(MultiHeadSelfAttention, self).__init__() - self.head = head - self.feats = feats - self.sqrt_d = self.feats**0.5 - - self.q = nn.Linear(feats, feats) - self.k = nn.Linear(feats, feats) - self.v = nn.Linear(feats, feats) - - self.o = nn.Linear(feats, feats) - self.dropout = nn.Dropout(dropout) - - def forward(self, x): - b, n, f = x.size() - q = self.q(x).view(b, n, self.head, self.feats // self.head).transpose(1, 2) - k = self.k(x).view(b, n, self.head, self.feats // self.head).transpose(1, 2) - v = self.v(x).view(b, n, self.head, self.feats // self.head).transpose(1, 2) - - score = F.softmax( - torch.einsum("bhif, bhjf->bhij", q, k) / self.sqrt_d, dim=-1 - ) # (b,h,n,n) - attn = torch.einsum("bhij, bhjf->bihf", score, v) # (b,n,h,f//h) - o = self.dropout(self.o(attn.flatten(2))) - return o - - -if __name__ == "__main__": - b, c, h, w = 4, 3, 32, 32 - # net - net = ViT( - in_channels=c, - num_classes=10, - img_size=h, - patch=16, - dropout=0.1, - num_layers=7, - hidden=384, - head=12, - mlp_hidden=384, - is_cls_token=False, - ).cuda() - print(net) - # torchsummary.summary(net, (c, h, w)) - # inference - with profile(activities=[ProfilerActivity.CPU], record_shapes=True) as prof: - # with record_function("model_inference"): - x = torch.randn(b, c, h, w).cuda() - out = net(x) - out.mean().backward() - print(prof.key_averages().table(sort_by="cpu_time_total", row_limit=20))