From 66376f93723577005c7bbbaee08d31e707ea948a Mon Sep 17 00:00:00 2001 From: caozhou <48191911+Caozhou1995@users.noreply.github.com> Date: Fri, 27 Oct 2023 20:05:51 +0800 Subject: [PATCH] [AutoTuner]Improve ETCD fault tolerance (#58314) * add fault tolerant for etcd apis * fix metric bug * fix some bugs --- python/paddle/distributed/auto_tuner/prune.py | 4 -- .../paddle/distributed/auto_tuner/recorder.py | 5 +-- .../distributed/launch/controllers/master.py | 20 +++------- .../distributed/launch/controllers/watcher.py | 2 +- python/paddle/distributed/launch/main.py | 11 +++++- .../distributed/launch/utils/etcd_client.py | 38 +++++++++++++++++++ .../paddle/distributed/launch/utils/nvsmi.py | 2 +- 7 files changed, 57 insertions(+), 25 deletions(-) diff --git a/python/paddle/distributed/auto_tuner/prune.py b/python/paddle/distributed/auto_tuner/prune.py index abae3f606fee15..976089f9d05f2b 100644 --- a/python/paddle/distributed/auto_tuner/prune.py +++ b/python/paddle/distributed/auto_tuner/prune.py @@ -85,10 +85,6 @@ def prune_by_mp(tuner_cfg, cur_cfg, history_cfgs=None): if mp_degree not in mp_degree_candidates: return True - # prune default candidates - if mp_degree > 8: - return True - return False diff --git a/python/paddle/distributed/auto_tuner/recorder.py b/python/paddle/distributed/auto_tuner/recorder.py index 71c1b08ff3ecdf..11517da529f4fe 100644 --- a/python/paddle/distributed/auto_tuner/recorder.py +++ b/python/paddle/distributed/auto_tuner/recorder.py @@ -70,9 +70,8 @@ def get_best(self, metric, direction, mode=None) -> Tuple[dict, bool]: if first_few >= 5: break return (best_cfg, False) - if ( - isinstance(self.history[0]["max_mem_usage"], str) - or self.history[0]["time"] == -1 + if isinstance(self.history[0]["max_mem_usage"], str) or ( + "time" in self.history[0] and self.history[0]["time"] == -1 ): return (self.history[0], True) return (self.history[0], False) diff --git a/python/paddle/distributed/launch/controllers/master.py b/python/paddle/distributed/launch/controllers/master.py index d625887b8167f0..27e294907304b5 100644 --- a/python/paddle/distributed/launch/controllers/master.py +++ b/python/paddle/distributed/launch/controllers/master.py @@ -197,8 +197,9 @@ def __init__(self, ctx): host, port = self.endpoint.split(':') if ctx.is_auto_tuner_mode(): - self.etcd_client = ETCDClient(host=host, port=port) - self.client = etcd3.client(host=host, port=port) + self.client = ETCDClient(host=host, port=port) + else: + self.client = etcd3.client(host=host, port=port) def sync_peers(self, prefix, key, value, size, rank=-1) -> (list, int): ''' @@ -256,22 +257,13 @@ def register_heartbeat(self, job_id, pod_id, ttl=10): self.job_prefix = f'/paddle/{job_id}' self.heartbeat_prefix = f'{self.job_prefix}/heartbeat' - if self.ctx.is_auto_tuner_mode(): - self.etcd_client.delete_prefix(self.job_prefix) - lease = self.etcd_client.lease(ttl) - else: - self.client.delete_prefix(self.job_prefix) - lease = self.client.lease(ttl) + self.client.delete_prefix(self.job_prefix) + lease = self.client.lease(ttl) # self.client.delete_prefix(self.job_prefix) beat_path = f"{self.heartbeat_prefix}/{pod_id}" - if self.ctx.is_auto_tuner_mode(): - self.etcd_client.put( - beat_path, pod_id.encode('latin-1'), lease=lease - ) - else: - self.client.put(beat_path, pod_id.encode('latin-1'), lease=lease) + self.client.put(beat_path, pod_id.encode('latin-1'), lease=lease) def _beat_watch(event): self.ctx.status.restart() diff --git a/python/paddle/distributed/launch/controllers/watcher.py b/python/paddle/distributed/launch/controllers/watcher.py index 25855572620f85..fd5571c39d4434 100644 --- a/python/paddle/distributed/launch/controllers/watcher.py +++ b/python/paddle/distributed/launch/controllers/watcher.py @@ -23,7 +23,7 @@ class Watcher: def __init__(self, ctx): self.ctx = ctx - self.interval = 30 + self.interval = 5 self.gpu_util = [] diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index 1fc2e6713e1b63..e24984e6f1479c 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -656,6 +656,7 @@ def launch(): elif "OK" not in status: timeout_flag = False + has_error = False if err & (1 << 0): ctx.logger.warning( f"Read metric failed for parameters: {log_dir}" @@ -665,6 +666,7 @@ def launch(): cur_cfg['time'] = -1 cur_cfg[tuner_cfg['metric_cfg']['name']] = None cur_cfg["max_mem_usage"] = mem if not OOM_flag else "OOM" + has_error = True if err & (1 << 1): ctx.logger.warning(f"Out of memory for parameters: {log_dir}") @@ -673,6 +675,7 @@ def launch(): cur_cfg['time'] = -1 cur_cfg[tuner_cfg['metric_cfg']['name']] = None cur_cfg["max_mem_usage"] = "OOM" + has_error = True # not err & (1 << 1): do not record memory usage when out of memory if err & (1 << 2) and not err & (1 << 1): @@ -684,18 +687,20 @@ def launch(): ) cur_cfg["max_mem_usage"] = None if not OOM_flag else "OOM" - if not err and timeout_flag: + if not has_error and timeout_flag: # for pruner use cur_cfg['time'] = metric cur_cfg[tuner_cfg['metric_cfg']['name']] = metric cur_cfg["max_mem_usage"] = mem if not OOM_flag else "OOM" - if not err and not timeout_flag: + if not has_error and not timeout_flag: cur_cfg['time'] = -1 cur_cfg[tuner_cfg['metric_cfg']['name']] = None cur_cfg["max_mem_usage"] = None if not OOM_flag else "OOM" # record history + if tuner_cfg['metric_cfg']['name'] not in cur_cfg: + cur_cfg[tuner_cfg['metric_cfg']['name']] = None cur_cfg['job_id'] = job_id recorder.add_cfg(**cur_cfg) recorder.store_history(history_file_path) @@ -794,6 +799,8 @@ def launch(): ctx.logger.info(f"AutoTuner ends in {end_time-start_time}s.") logger.info(f"AutoTuner ends in {end_time-start_time}s.") # launch best cfg + if not tuner_cfg.get("run_best", True): + sys.exit() new_args = gen_new_args(raw_args, best_cfg, tuner_cfg, run_best=True) ctx.run_best = True ctx.args.training_script_args = new_args diff --git a/python/paddle/distributed/launch/utils/etcd_client.py b/python/paddle/distributed/launch/utils/etcd_client.py index e4bbf8e1409a4d..a96c7a034fdb18 100644 --- a/python/paddle/distributed/launch/utils/etcd_client.py +++ b/python/paddle/distributed/launch/utils/etcd_client.py @@ -140,3 +140,41 @@ def lease(self, ttl, lease_id=None): if times >= self.retry_times: raise ValueError(f"Lease failed after {self.retry_times} times.") + + def add_watch_prefix_callback(self, key_prefix, callback, **kwargs): + times = 0 + while times < self.retry_times: + try: + return self.client.add_watch_prefix_callback( + key_prefix, callback, **kwargs + ) + break + except Exception as e: + times += 1 + logging.info( + f"Add watch prefix callback failed with exception {e}, retry after 1 second." + ) + time.sleep(1) + + if times >= self.retry_times: + raise ValueError( + f"Add watch prefix callback failed after {self.retry_times} times." + ) + + def cancel_watch(self, watch_id): + times = 0 + while times < self.retry_times: + try: + return self.client.cancel_watch(watch_id) + break + except Exception as e: + times += 1 + logging.info( + f"Cancel watch failed with exception {e}, retry after 1 second." + ) + time.sleep(1) + + if times >= self.retry_times: + raise ValueError( + f"Cancel watch failed after {self.retry_times} times." + ) diff --git a/python/paddle/distributed/launch/utils/nvsmi.py b/python/paddle/distributed/launch/utils/nvsmi.py index 0c51456bf1204f..232ccce2209cce 100644 --- a/python/paddle/distributed/launch/utils/nvsmi.py +++ b/python/paddle/distributed/launch/utils/nvsmi.py @@ -133,7 +133,7 @@ def get_gpu_util(index=None): if index is None or isinstance(index, list) else str(index).split(",") ) - if paddle.device.is_compiled_with_cuda(): + if paddle.device.is_compiled_with_rocm(): return query_rocm_smi(q, index=index, dtype=d) return query_smi(q, index=index, dtype=d)