Skip to content

Commit

Permalink
fix simple lint error
Browse files Browse the repository at this point in the history
  • Loading branch information
cccvs committed Mar 11, 2024
1 parent c9ddf6f commit 87e5383
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 25 deletions.
10 changes: 5 additions & 5 deletions bluefoglite/common/optimizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class CommunicationType(Enum):
)
_warning_message_backward_pass_per_step = (
"Unexpected behavior:\n"
" After num_steps_per_communication times of backward computation `loss.backward()` are called,\n"
" After num_steps_per_communication times of backward"
" computation `loss.backward()` are called,\n"
" an optimizer step() function must be called.\n"
" It does not matter how many step() functions are called in between.\n"
" Please adjust num_steps_per_communication to accumulate gradients locally.\n"
Expand Down Expand Up @@ -58,11 +59,11 @@ def _check_named_parameters(optimizer, model):
"model.named_parameters()."
)

dups = list(set([k for k, _ in named_parameters]))
dups = list({k for k, _ in named_parameters})
if dups:
raise ValueError(
"Parameter names in named_parameters must be unique. "
"Found duplicates: %s" % ", ".join(dups)
f"Found duplicates: {', '.join(dups)}"
)

all_param_ids = {
Expand All @@ -74,7 +75,7 @@ def _check_named_parameters(optimizer, model):
raise ValueError(
"Named parameters provided by model are mismatch with the parameters"
"handled by optimizer. Python object ids: "
"%s" % ", ".join(str(id) for id in unnamed_param_ids)
f"{', '.join(str(id) for id in unnamed_param_ids)}"
)
return named_parameters, _models

Expand All @@ -83,7 +84,6 @@ class _DistributedReduceOptimizer(torch.optim.Optimizer):
def __init__(
self, params, model, communication_type, num_steps_per_communication=1
):
print(self.__bases__)
super(self.__class__, self).__init__(params)

named_parameters, models = _check_named_parameters(self, model)
Expand Down
6 changes: 4 additions & 2 deletions bluefoglite/common/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ def GetInnerOuterRingDynamicSendRecvRanks(
), "It should be used under homogeneous environment only."
assert local_size > 2, (
"Do no support the case where nodes_per_machine is equal or "
"less than 2. Consider use hierarchical_neighbor_allreduce or GetDynamicOnePeerSendRecvRanks."
"less than 2. Consider use hierarchical_neighbor_allreduce "
"or GetDynamicOnePeerSendRecvRanks."
)

index = 0
Expand Down Expand Up @@ -352,7 +353,8 @@ def GetInnerOuterExpo2DynamicSendRecvRanks(
), "It should be used under homogeneous environment only."
assert local_size > 2, (
"Do no support the case where nodes_per_machine is equal or "
"less than 2. Consider use hierarchical_neighbor_allreduce or GetDynamicOnePeerSendRecvRanks."
"less than 2. Consider use hierarchical_neighbor_allreduce or"
" GetDynamicOnePeerSendRecvRanks."
)

exp_2_out_size = int(np.log2(num_machines - 1))
Expand Down
12 changes: 6 additions & 6 deletions bluefoglite/common/torch_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(self) -> None:
self._size: Optional[int] = None
self._topology_and_weights: Optional[TopologyAndWeights] = None
self._process_group: Optional[dist.ProcessGroup] = None
self._backend: Optional[str] = None

@property
def process_group(self):
Expand Down Expand Up @@ -216,15 +217,14 @@ def neighbor_allreduce_nonblocking(

op_list = []
for dst, weight in dst_weights.items():
comm_tensor = (
tensor.mul(weight).to("cpu")
if self._backend == "gloo" and tensor.device.type != "cpu"
else tensor.mul(weight)
)
op_list.append(
dist.P2POp(
dist.isend,
comm_tensor,
(
tensor.mul(weight).to("cpu")
if self._backend == "gloo" and tensor.device.type != "cpu"
else tensor.mul(weight)
),
peer=dst,
group=self.process_group,
)
Expand Down
20 changes: 9 additions & 11 deletions bluefoglite/utility.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import collections
import torch
import bluefoglite.torch_api as bfl
import collections


def broadcast_parameters(params, root_rank):
Expand All @@ -22,11 +22,11 @@ def broadcast_parameters(params, root_rank):
# support both named_parameters() and regular parameters()
params = [p if isinstance(p, tuple) else (None, p) for p in params]
else:
raise ValueError("invalid params of type: %s" % type(params))
raise ValueError(f"invalid params of type: {type(params)}")

# Run asynchronous broadcasts.
async_works = []
for name, p in params:
for _, p in params:
async_work = bfl.broadcast_nonblocking(p, inplace=True, root_rank=root_rank)
async_works.append(async_work)

Expand All @@ -43,11 +43,11 @@ def neighbor_allreduce_parameters(params):
# support both named_parameters() and regular parameters()
params = [p if isinstance(p, tuple) else (None, p) for p in params]
else:
raise ValueError("invalid params of type: %s" % type(params))
raise ValueError(f"invalid params of type: {type(params)}")

# Run asynchronous broadcasts.
async_works = []
for name, p in params:
for _, p in params:
if torch.is_floating_point(p):
async_work = bfl.neighbor_allreduce_nonblocking(p, inplace=True)
async_works.append(async_work)
Expand Down Expand Up @@ -95,17 +95,15 @@ def broadcast_optimizer_state(optimizer, root_rank, device):
def _get_types(x):
if isinstance(x, collections.Iterable):
return type(x), [_get_types(xi) for xi in x]
else:
return type(x)
return type(x)

# Casts an object encoded in a tensor back into its original type and subtypes
def _recursive_cast(x, dtype):
if isinstance(dtype, tuple):
t, dtypes = dtype
x = t(x)
return t([_recursive_cast(x[i], dtypes[i]) for i in range(len(x))])
else:
return dtype(x)
return dtype(x)

# Some optimizer parameters may be represented as scalars instead of
# tensors. In such cases, we need to wrap the scalar in a tensor, then
Expand Down Expand Up @@ -142,7 +140,7 @@ def _from_tensor():
if option_key == "params":
continue
# Options like the learning rate are scalar, and need to be wrapped in tensors
key = "%s.%d" % (option_key, index)
key = f"{option_key}.{index}"
dtypes = _get_types(option_value)
option_tensor = torch.tensor([option_value], device=device)
callbacks[key] = _create_option_callback(
Expand All @@ -161,7 +159,7 @@ def _from_tensor():
if p is None:
continue
occurrences[name] += 1
key = "%s.%d" % (str(name), occurrences[name])
key = f"{str(name)}.{occurrences[name]}"

if not torch.is_tensor(p):
# Wrap the scalar in a FloatTensor, and remember its type
Expand Down
2 changes: 1 addition & 1 deletion tools/compile_proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# Compiled .proto code to python file

SRC_DIR=bluefoglite/common/tcp
protoc -I=$SRC_DIR --python_out=$SRC_DIR $SRC_DIR/message.proto
protoc -I=$SRC_DIR --python_out=$SRC_DIR --mypy_out=$SRC_DIR $SRC_DIR/message.proto

0 comments on commit 87e5383

Please sign in to comment.