Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fault tolerance support for trial failure #424

Merged
merged 7 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pkg/suggestion/NAS_Reinforcement_Learning/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ This neural architecture can be visualized as
![a neural netowrk architecure example](example.png)

## To Do
1. Change LSTM cell from self defined functions in LSTM.py to `tf.nn.rnn_cell.LSTMCell`
2. Store the suggestion checkpoint to PVC to protect against unexpected nasrl service pod restarts
3. Add `RequestCount` into API so that the suggestion can clean the information of completed studies.
1. Add 'micro' mode, which means searching for a neural cell instead of the whole neural network.
2. Add supoort for recurrent neural networks and build a training container for the Penn Treebank task.
3. Add parameter sharing, if possible.
4. Change LSTM cell from self defined functions in LSTM.py to `tf.nn.rnn_cell.LSTMCell`
5. Store the suggestion checkpoint to PVC to protect against unexpected nasrl service pod restarts
6. Add `RequestCount` into API so that the suggestion can clean the information of completed studies.
74 changes: 55 additions & 19 deletions pkg/suggestion/nasrl_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@

MANAGER_ADDRESS = "vizier-core"
MANAGER_PORT = 6789
RESPAWN_SLEEP = 20
RESPAWN_LIMIT = 10


class NAS_RL_StudyJob(object):
def __init__(self, request, logger):
self.logger = logger
self.study_id = request.study_id
self.param_id = request.param_id
self.num_trials = request.request_number
self.num_trials = 1
if request.request_number > 0:
self.num_trials = request.request_number
self.study_name = None
self.tf_graph = tf.Graph()
self.prev_trial_ids = list()
self.prev_trials = None
self.ctrl_cache_file = "ctrl_cache/{}/{}.ckpt".format(request.study_id, request.study_id)
self.ctrl_step = 0
self.is_first_run = True
Expand All @@ -37,6 +42,7 @@ def __init__(self, request, logger):
self.search_space = None
self.opt_direction = None
self.objective_name = None
self.respawn_count = 0

self.logger.info("-" * 100 + "\nSetting Up Suggestion for StudyJob ID {}\n".format(request.study_id) + "-" * 100)
self._get_study_param()
Expand Down Expand Up @@ -272,12 +278,31 @@ def GetSuggestions(self, request, context):
valid_acc = ctrl.reward
result = self.GetEvaluationResult(study)

# In some rare cases, GetEvaluationResult() may return None
# if GetSuggestions() is called before all the trials are completed
while result is None:
self.logger.warning(">>> GetEvaluationResult() returns None")
time.sleep(20)
result = self.GetEvaluationResult(study)

# Sometimes training container may fail and GetEvaluationResult() will return None
# In this case, the Suggestion will:
# 1. Firstly try to respawn the previous trials after waiting for RESPAWN_SLEEP seconds
# 2. If respawning the trials for RESPAWN_LIMIT times still cannot collect valid results,
# then fail the task because it may indicate that the training container has errors.

if result is None:
if study.respawn_count >= RESPAWN_LIMIT:
self.logger.warning(">>> Suggestion has spawned trials for {} times, but they all failed.".format(RESPAWN_LIMIT))
self.logger.warning(">>> Please check whether the training container is correctly implemented")
self.logger.info(">>> StudyJob {} failed".format(study.study_name))
return []

else:
self.logger.warning(">>> GetEvaluationResult() returns None. All the previous trials failed")

self.logger.info(">>> Sleep for {} seconds".format(RESPAWN_SLEEP))
time.sleep(RESPAWN_SLEEP)

self.logger.info(">>> Respawn the previous trials")
study.respawn_count += 1
return self.SpawnTrials(study, study.prev_trials)

study.respawn_count = 0

# This LSTM network is designed to maximize the metrics
# However, if the user wants to minimize the metrics, we can take the negative of the result
Expand All @@ -287,7 +312,7 @@ def GetSuggestions(self, request, context):
loss, entropy, lr, gn, bl, skip, _ = sess.run(
fetches=run_ops,
feed_dict={valid_acc: result})
self.logger.info(">>> Suggetion updated. LSTM Controller Reward: {}".format(loss))
self.logger.info(">>> Suggestion updated. LSTM Controller Reward: {}".format(loss))

candidates = list()
for _ in range(study.num_trials):
Expand Down Expand Up @@ -342,17 +367,21 @@ def GetSuggestions(self, request, context):
)
)

self.prev_trial_ids = list()
return self.SpawnTrials(study, trials)

def SpawnTrials(self, study, trials):
study.prev_trials = trials
study.prev_trial_ids = list()
self.logger.info("")
channel = grpc.beta.implementations.insecure_channel(MANAGER_ADDRESS, MANAGER_PORT)
with api_pb2.beta_create_Manager_stub(channel) as client:
for i, t in enumerate(trials):
ctrep = client.CreateTrial(api_pb2.CreateTrialRequest(trial=t), 10)
trials[i].trial_id = ctrep.trial_id
self.prev_trial_ids.append(ctrep.trial_id)
study.prev_trial_ids.append(ctrep.trial_id)

self.logger.info(">>> {} Trials were created:".format(study.num_trials))
for t in self.prev_trial_ids:
for t in study.prev_trial_ids:
self.logger.info(t)
self.logger.info("")

Expand All @@ -368,16 +397,23 @@ def GetEvaluationResult(self, study):

completed_trials = dict()
for t in trials_list:
if t.Worker.trial_id in self.prev_trial_ids and t.Worker.status == api_pb2.COMPLETED:
if t.Worker.trial_id in study.prev_trial_ids and t.Worker.status == api_pb2.COMPLETED:
for ml in t.metrics_logs:
if ml.name == study.objective_name:
completed_trials[t.Worker.trial_id] = float(ml.values[-1].value)

if len(completed_trials) == study.num_trials:
self.logger.info(">>> Evaluation results of previous trials:")
for k in completed_trials:
self.logger.info("{}: {}".format(k, completed_trials[k]))
avg_metrics = sum(completed_trials.values()) / study.num_trials

n_complete = len(completed_trials)
n_fail = study.num_trials - n_complete

self.logger.info(">>> {} Trials succeeded, {} Trials failed:".format(n_complete, n_fail))
for tid in study.prev_trial_ids:
if tid in completed_trials:
self.logger.info("{}: {}".format(tid, completed_trials[tid]))
else:
self.logger.info("{}: Failed".format(tid))

if n_complete > 0:
avg_metrics = sum(completed_trials.values()) / n_complete
self.logger.info("The average is {}\n".format(avg_metrics))

return avg_metrics
return avg_metrics