Skip to content

Commit

Permalink
Fix cloud multi-nodes
Browse files Browse the repository at this point in the history
* Copy ssh key to allow connections from master to workers
* Use local ip for manager's ip such that workers can find it and connect to it
* Fix incompatibility between pandas and numpy 2.0.0
  • Loading branch information
satyaog committed Aug 8, 2024
1 parent 0f34dd2 commit fa32dde
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 14 deletions.
1 change: 1 addition & 0 deletions config/cloud-multinodes-system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ system:
- name: manager
# Use 1.1.1.1 as an ip placeholder
ip: 1.1.1.1
port: 5000
# Use this node as the master node or not
main: true
# User to use in remote milabench operations
Expand Down
13 changes: 10 additions & 3 deletions milabench/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,13 @@ def _get_main_and_workers(self):
def _argv(self, **_) -> List:
manager, nodes = self._get_main_and_workers()

# Find local ip such that workers can connect to the port
for manager_ip in manager["ipaddrlist"]:
if ":" in manager_ip or manager_ip == "127.0.0.1":
continue
if all(str.isnumeric(n) for n in manager_ip.split(".")):
break

num_machines = max(1, len(nodes) + 1)

# Cant do that maybe this run is constrained
Expand Down Expand Up @@ -976,9 +983,9 @@ def _argv(self, **_) -> List:
f"--machine_rank={self.rank}",
f"--num_machines={num_machines}",
*deepspeed_argv,
f"--gradient_accumulation_steps={self.pack.config.get('gradient_accumulation_steps', 1)}",
f"--num_cpu_threads_per_process={cpu_per_process}",
f"--main_process_ip={manager['ip']}",
f"--gradient_accumulation_steps={self.pack.config['gradient_accumulation_steps']}",
f"--num_cpu_threads_per_process={self.pack.config['argv']['--cpus_per_gpu']}",
f"--main_process_ip={manager_ip}",
f"--main_process_port={manager['port']}",
f"--num_processes={nproc}",
*self.accelerate_argv,
Expand Down
5 changes: 0 additions & 5 deletions milabench/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,11 @@
import os
import sys

import yaml

from milabench.fs import XPath

from . import ROOT_FOLDER
from .commands import (
CmdCommand,
Command,
ListCommand,
SCPCommand,
SequenceCommand,
SSHCommand,
VoidCommand,
Expand Down
49 changes: 44 additions & 5 deletions milabench/scripts/covalent/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,63 @@ def _popen(cmd, *args, _env=None, **kwargs):
assert result and result[0]

all_connection_attributes, _ = result
ssh_key_file:str = None
for hostname, connection_attributes in all_connection_attributes.items():
print(f"hostname::>{hostname}")
for attribute,value in connection_attributes.items():
if attribute == "hostname":
continue
print(f"{attribute}::>{value}")

ssh_key_file = (
ssh_key_file or connection_attributes["ssh_key_file"]
)

if len(all_connection_attributes) >= 1:
fn = pathlib.Path(ssh_key_file)
dispatch_id = ct.dispatch(
ct.lattice(executor.cp_to_remote), disable_run=False
)(f".ssh/{fn.name.split('.')[0]}", str(fn))

result = ct.get_result(dispatch_id=dispatch_id, wait=True).result

if argv:
dispatch_id = ct.dispatch(
ct.lattice(
lambda:ct.electron(_popen, executor=executor)(argv)
),
disable_run=False
ct.lattice(executor.list_running_instances), disable_run=False
)()

result = ct.get_result(dispatch_id=dispatch_id, wait=True).result

return_code, _, _ = result if result is not None else (1, "", "")
assert result

dispatch_ids = set()
for connection_attributes in result.get(
(executor.state_prefix, executor.state_id),
{"env": None}
).values():
kwargs = {
**_get_executor_kwargs(args),
**connection_attributes
}
del kwargs["env"]

_executor:ct.executor.BaseExecutor = executor_cls(**kwargs)

dispatch_ids.add(
ct.dispatch(
ct.lattice(
lambda:ct.electron(_popen, executor=_executor)(argv)
),
disable_run=False
)()
)

for dispatch_id in dispatch_ids:
result = ct.get_result(dispatch_id=dispatch_id, wait=True).result

_return_code, _, _ = result if result is not None else (1, "", "")
return_code = return_code or _return_code

finally:
if args.teardown:
result = executor.stop_cloud_instance().result
Expand Down
4 changes: 4 additions & 0 deletions milabench/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ def _resolve_ip(ip):
if not offline:
# Resolve the IP
try:
# Workaround error with `gethostbyaddr` on azure DNS (like
# `inmako.eastus2.cloudapp.azure.com`). A proper fix might be a
# correct network config in terraform.
# socket.herror: [Errno 1] Unknown host
hostname, aliaslist, ipaddrlist = socket.gethostbyname_ex(ip)
lazy_raise = None

Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ blessed = "^1.19.1"
pathspec = "^0.9.0"
cp-template = "^0.3.0"
pandas = ">=1.4.2"
numpy = ">=1.23.0,<2.0.0"
# Work around for compatibility issue between numpy 2.0.0 and pandas
# https://github.com/numpy/numpy/issues/26710
numpy = "^1.23.0"
pynvml = "^11.4.1"
tqdm = "^4.64.1"
pip-tools = "^7.4.1"
Expand Down

0 comments on commit fa32dde

Please sign in to comment.