From b9fb440681ccb13142ebb68bf72a905a119021e5 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Mon, 23 Jan 2023 12:54:06 +0100 Subject: [PATCH 1/3] wip Signed-off-by: sven1977 --- rllib/algorithms/algorithm.py | 141 ++++++++++++++++------------------ 1 file changed, 68 insertions(+), 73 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 47d50a12db8c4..4afd6b8639f3d 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -493,7 +493,8 @@ def setup(self, config: AlgorithmConfig) -> None: self._record_usage(self.config) - self.callbacks = self.config["callbacks"]() + # Create the callbacks object. + self.callbacks = self.config.callbacks_class() # Create local replay buffer if necessary. self.local_replay_buffer = self._create_local_replay_buffer_if_necessary( @@ -523,7 +524,7 @@ def setup(self, config: AlgorithmConfig) -> None: error=True, help="Running OPE during training is not recommended.", ) - self.config["off_policy_estimation_methods"] = ope_dict + self.config.off_policy_estimation_methods = ope_dict # Deprecated way of implementing Trainer sub-classes (or "templates" # via the `build_trainer` utility function). @@ -557,14 +558,14 @@ def setup(self, config: AlgorithmConfig) -> None: validate_env=self.validate_env, default_policy_class=self.get_default_policy_class(self.config), config=self.config, - num_workers=self.config["num_workers"], + num_workers=self.config.num_rollout_workers, local_worker=True, logdir=self.logdir, ) # TODO (avnishn): Remove the execution plan API by q1 2023 # Function defining one single training iteration's behavior. - if self.config["_disable_execution_plan_api"]: + if self.config._disable_execution_plan_api: # Ensure remote workers are initially in sync with the local worker. self.workers.sync_weights() # LocalIterator-creating "execution plan". @@ -605,13 +606,13 @@ def setup(self, config: AlgorithmConfig) -> None: validate_env=None, default_policy_class=self.get_default_policy_class(self.config), config=self.evaluation_config, - num_workers=self.config["evaluation_num_workers"], + num_workers=self.config.evaluation_num_workers, # Don't even create a local worker if num_workers > 0. local_worker=False, logdir=self.logdir, ) - if self.config["enable_async_evaluation"]: + if self.config.enable_async_evaluation: self._evaluation_weights_seq_number = 0 self.evaluation_dataset = None @@ -641,7 +642,7 @@ def setup(self, config: AlgorithmConfig) -> None: "dm": DirectMethod, "dr": DoublyRobust, } - for name, method_config in self.config["off_policy_estimation_methods"].items(): + for name, method_config in self.config.off_policy_estimation_methods.items(): method_type = method_config.pop("type") if method_type in ope_types: deprecation_warning( @@ -662,7 +663,7 @@ def setup(self, config: AlgorithmConfig) -> None: # offline evaluators. policy = self.get_policy() if issubclass(method_type, OffPolicyEstimator): - method_config["gamma"] = self.config["gamma"] + method_config["gamma"] = self.config.gamma self.reward_estimators[name] = method_type(policy, **method_config) else: raise ValueError( @@ -739,7 +740,7 @@ def step(self) -> ResultDict: results: ResultDict = {} # Parallel eval + training: Kick off evaluation-loop and parallel train() call. - if evaluate_this_iter and self.config["evaluation_parallel_to_training"]: + if evaluate_this_iter and self.config.evaluation_parallel_to_training: ( results, train_iter_ctx, @@ -751,12 +752,12 @@ def step(self) -> ResultDict: results, train_iter_ctx = self._run_one_training_iteration() # Sequential: Train (already done above), then evaluate. - if evaluate_this_iter and not self.config["evaluation_parallel_to_training"]: + if evaluate_this_iter and not self.config.evaluation_parallel_to_training: results.update(self._run_one_evaluation(train_future=None)) # Attach latest available evaluation results to train results, # if necessary. - if not evaluate_this_iter and self.config["always_attach_evaluation_results"]: + if not evaluate_this_iter and self.config.always_attach_evaluation_results: assert isinstance( self.evaluation_metrics, dict ), "Trainer.evaluate() needs to return a dict." @@ -767,17 +768,15 @@ def step(self) -> ResultDict: self._sync_filters_if_needed( from_worker=self.workers.local_worker(), workers=self.workers, - timeout_seconds=self.config[ - "sync_filters_on_rollout_workers_timeout_s" - ], + timeout_seconds=self.config.sync_filters_on_rollout_workers_timeout_s, ) # TODO (avnishn): Remove the execution plan API by q1 2023 # Collect worker metrics and add combine them with `results`. - if self.config["_disable_execution_plan_api"]: + if self.config._disable_execution_plan_api: episodes_this_iter = collect_episodes( self.workers, self._remote_worker_ids_for_metrics(), - timeout_seconds=self.config["metrics_episode_collection_timeout_s"], + timeout_seconds=self.config.metrics_episode_collection_timeout_s, ) results = self._compile_iteration_results( episodes_this_iter=episodes_this_iter, @@ -786,8 +785,8 @@ def step(self) -> ResultDict: ) # Check `env_task_fn` for possible update of the env's task. - if self.config["env_task_fn"] is not None: - if not callable(self.config["env_task_fn"]): + if self.config.env_task_fn is not None: + if not callable(self.config.env_task_fn): raise ValueError( "`env_task_fn` must be None or a callable taking " "[train_results, env, env_ctx] as args!" @@ -799,7 +798,7 @@ def fn(env, env_context, task_fn): if cur_task != new_task: env.set_task(new_task) - fn = functools.partial(fn, task_fn=self.config["env_task_fn"]) + fn = functools.partial(fn, task_fn=self.config.env_task_fn) self.workers.foreach_env_with_context(fn) return results @@ -834,20 +833,18 @@ def evaluate( self._sync_filters_if_needed( from_worker=self.workers.local_worker(), workers=self.evaluation_workers, - timeout_seconds=self.config[ - "sync_filters_on_rollout_workers_timeout_s" - ], + timeout_seconds=self.config.sync_filters_on_rollout_workers_timeout_s, ) self.callbacks.on_evaluate_start(algorithm=self) - if self.config["custom_eval_function"]: + if self.config.custom_evaluation_function: logger.info( "Running custom eval function {}".format( - self.config["custom_eval_function"] + self.config.custom_evaluation_function ) ) - metrics = self.config["custom_eval_function"](self, self.evaluation_workers) + metrics = self.config.custom_evaluation_function(self, self.evaluation_workers) if not metrics or not isinstance(metrics, dict): raise ValueError( "Custom eval function must return " @@ -872,15 +869,15 @@ def evaluate( # How many episodes/timesteps do we need to run? # In "auto" mode (only for parallel eval + training): Run as long # as training lasts. - unit = self.config["evaluation_duration_unit"] + unit = self.config.evaluation_duration_unit eval_cfg = self.evaluation_config - rollout = eval_cfg["rollout_fragment_length"] - num_envs = eval_cfg["num_envs_per_worker"] - auto = self.config["evaluation_duration"] == "auto" + rollout = eval_cfg.rollout_fragment_length + num_envs = eval_cfg.num_envs_per_worker + auto = self.config.evaluation_duration == "auto" duration = ( - self.config["evaluation_duration"] + self.config.evaluation_duration if not auto - else (self.config["evaluation_num_workers"] or 1) + else (self.config.evaluation_num_workers or 1) * (1 if unit == "episodes" else rollout) ) agent_steps_this_iter = 0 @@ -893,7 +890,7 @@ def evaluate( def duration_fn(num_units_done): return duration - num_units_done - logger.info(f"Evaluating current policy for {duration} {unit}.") + logger.info(f"Evaluating current state of {self} for {duration} {unit}.") metrics = None all_batches = [] @@ -914,8 +911,8 @@ def duration_fn(num_units_done): all_batches.append(batch) metrics = collect_metrics( self.workers, - keep_custom_metrics=eval_cfg["keep_per_episode_custom_metrics"], - timeout_seconds=eval_cfg["metrics_episode_collection_timeout_s"], + keep_custom_metrics=eval_cfg.keep_per_episode_custom_metrics, + timeout_seconds=eval_cfg.metrics_episode_collection_timeout_s, ) # Evaluation worker set only has local worker. @@ -960,13 +957,13 @@ def duration_fn(num_units_done): func=lambda w: w.sample(), local_worker=False, remote_worker_ids=selected_eval_worker_ids, - timeout_seconds=self.config["evaluation_sample_timeout_s"], + timeout_seconds=self.config.evaluation_sample_timeout_s, ) if len(batches) != len(selected_eval_worker_ids): logger.warning( "Calling `sample()` on your remote evaluation worker(s) " "resulted in a timeout (after the configured " - f"{self.config['evaluation_sample_timeout_s']} seconds)! " + f"{self.config.evaluation_sample_timeout_s} seconds)! " "Try to set `evaluation_sample_timeout_s` in your config" " to a larger value." + ( @@ -1005,7 +1002,7 @@ def duration_fn(num_units_done): env_steps_this_iter += _env_steps logger.info( - f"Ran round {_round} of parallel evaluation " + f"Ran round {_round} of non-parallel evaluation " f"({num_units_done}/{duration if not auto else '?'} " f"{unit} done)" ) @@ -1017,8 +1014,8 @@ def duration_fn(num_units_done): if metrics is None: metrics = collect_metrics( self.evaluation_workers, - keep_custom_metrics=self.config["keep_per_episode_custom_metrics"], - timeout_seconds=eval_cfg["metrics_episode_collection_timeout_s"], + keep_custom_metrics=self.config.keep_per_episode_custom_metrics, + timeout_seconds=eval_cfg.metrics_episode_collection_timeout_s, ) metrics[NUM_AGENT_STEPS_SAMPLED_THIS_ITER] = agent_steps_this_iter metrics[NUM_ENV_STEPS_SAMPLED_THIS_ITER] = env_steps_this_iter @@ -1032,9 +1029,7 @@ def duration_fn(num_units_done): for batch in all_batches: estimate_result = estimator.estimate( batch, - split_batch_by_episode=self.config[ - "ope_split_batch_by_episode" - ], + split_batch_by_episode=self.config.ope_split_batch_by_episode, ) estimates[name].append(estimate_result) @@ -1084,15 +1079,15 @@ def _evaluate_async( # How many episodes/timesteps do we need to run? # In "auto" mode (only for parallel eval + training): Run as long # as training lasts. - unit = self.config["evaluation_duration_unit"] + unit = self.config.evaluation_duration_unit eval_cfg = self.evaluation_config - rollout = eval_cfg["rollout_fragment_length"] - num_envs = eval_cfg["num_envs_per_worker"] - auto = self.config["evaluation_duration"] == "auto" + rollout = eval_cfg.rollout_fragment_length + num_envs = eval_cfg.num_envs_per_worker + auto = self.config.evaluation_duration == "auto" duration = ( - self.config["evaluation_duration"] + self.config.evaluation_duration if not auto - else (self.config["evaluation_num_workers"] or 1) + else (self.config.evaluation_num_workers or 1) * (1 if unit == "episodes" else rollout) ) @@ -1103,17 +1098,17 @@ def _evaluate_async( self._sync_filters_if_needed( from_worker=self.workers.local_worker(), workers=self.evaluation_workers, - timeout_seconds=eval_cfg.get("sync_filters_on_rollout_workers_timeout_s"), + timeout_seconds=eval_cfg.sync_filters_on_rollout_workers_timeout_s, ) - if self.config["custom_eval_function"]: + if self.config.custom_evaluation_function: raise ValueError( - "`custom_eval_function` not supported in combination " + "`config.custom_evaluation_function` not supported in combination " "with `enable_async_evaluation=True` config setting!" ) if self.evaluation_workers is None and ( self.workers.local_worker().input_reader is None - or self.config["evaluation_num_workers"] == 0 + or self.config.evaluation_num_workers == 0 ): raise ValueError( "Evaluation w/o eval workers (calling Algorithm.evaluate() w/o " @@ -1126,7 +1121,7 @@ def _evaluate_async( agent_steps_this_iter = 0 env_steps_this_iter = 0 - logger.info(f"Evaluating current policy for {duration} {unit}.") + logger.info(f"Evaluating current state of {self} for {duration} {unit}.") all_batches = [] @@ -1310,11 +1305,11 @@ def training_step(self) -> ResultDict: # Collect SampleBatches from sample workers until we have a full batch. if self.config.count_steps_by == "agent_steps": train_batch = synchronous_parallel_sample( - worker_set=self.workers, max_agent_steps=self.config["train_batch_size"] + worker_set=self.workers, max_agent_steps=self.config.train_batch_size ) else: train_batch = synchronous_parallel_sample( - worker_set=self.workers, max_env_steps=self.config["train_batch_size"] + worker_set=self.workers, max_env_steps=self.config.train_batch_size ) train_batch = train_batch.as_multi_agent() self._counters[NUM_AGENT_STEPS_SAMPLED] += train_batch.agent_steps() @@ -1417,17 +1412,17 @@ def compute_single_action( full_fetch: Whether to return extra action fetch results. This is always set to True if `state` is specified. explore: Whether to apply exploration to the action. - Default: None -> use self.config["explore"]. + Default: None -> use self.config.explore. timestep: The current (sampling) time step. episode: This provides access to all of the internal episodes' state, which may be useful for model-based or multi-agent algorithms. unsquash_action: Should actions be unsquashed according to the env's/Policy's action space? If None, use the value of - self.config["normalize_actions"]. + self.config.normalize_actions. clip_action: Should actions be clipped according to the env's/Policy's action space? If None, use the value of - self.config["clip_actions"]. + self.config.clip_actions. Keyword Args: kwargs: forward compatibility placeholder @@ -1458,10 +1453,10 @@ def compute_single_action( # `unsquash_action` is None: Use value of config['normalize_actions']. if unsquash_action is None: - unsquash_action = self.config["normalize_actions"] + unsquash_action = self.config.normalize_actions # `clip_action` is None: Use value of config['clip_actions']. elif clip_action is None: - clip_action = self.config["clip_actions"] + clip_action = self.config.clip_actions # User provided an input-dict: Assert that `obs`, `prev_a|r`, `state` # are all None. @@ -1612,17 +1607,17 @@ def compute_actions( full_fetch: Whether to return extra action fetch results. This is always set to True if RNN state is specified. explore: Whether to pick an exploitation or exploration - action (default: None -> use self.config["explore"]). + action (default: None -> use self.config.explore). timestep: The current (sampling) time step. episodes: This provides access to all of the internal episodes' state, which may be useful for model-based or multi-agent algorithms. unsquash_actions: Should actions be unsquashed according to the env's/Policy's action space? If None, use - self.config["normalize_actions"]. + self.config.normalize_actions. clip_actions: Should actions be clipped according to the env's/Policy's action space? If None, use - self.config["clip_actions"]. + self.config.clip_actions. Keyword Args: kwargs: forward compatibility placeholder @@ -1642,10 +1637,10 @@ def compute_actions( # `unsquash_actions` is None: Use value of config['normalize_actions']. if unsquash_actions is None: - unsquash_actions = self.config["normalize_actions"] + unsquash_actions = self.config.normalize_actions # `clip_actions` is None: Use value of config['clip_actions']. elif clip_actions is None: - clip_actions = self.config["clip_actions"] + clip_actions = self.config.clip_actions # Preprocess obs and states. state_defined = state is not None @@ -2259,7 +2254,7 @@ def _sync_filters_if_needed( FilterManager.synchronize( from_worker.filters, workers, - update_remote=self.config["synchronize_filters"], + update_remote=self.config.synchronize_filters, timeout_seconds=timeout_seconds, ) logger.debug("synchronized filters: {}".format(from_worker.filters)) @@ -2803,11 +2798,11 @@ def _should_create_evaluation_rollout_workers(cls, eval_config: "AlgorithmConfig Returns False if we need to run offline evaluation (with ope.estimate_on_dastaset API) or when local worker is to be used for evaluation. Note: We only use estimate_on_dataset API with bandits for now. - That is when ope_split_batch_by_episode is False. TODO: In future we will do - the same for episodic RL OPE. + That is when ope_split_batch_by_episode is False. + TODO: In future we will do the same for episodic RL OPE. """ run_offline_evaluation = ( - eval_config.get("off_policy_estimation_methods") + eval_config.off_policy_estimation_methods and not eval_config.ope_split_batch_by_episode ) return not run_offline_evaluation and ( @@ -2857,7 +2852,7 @@ def _compile_iteration_results( # Calculate how many (if any) of older, historical episodes we have to add to # `episodes_this_iter` in order to reach the required smoothing window. episodes_for_metrics = episodes_this_iter[:] - missing = self.config["metrics_num_episodes_for_smoothing"] - len( + missing = self.config.metrics_num_episodes_for_smoothing - len( episodes_this_iter ) # We have to add some older episodes to reach the smoothing window size. @@ -2865,7 +2860,7 @@ def _compile_iteration_results( episodes_for_metrics = self._episode_history[-missing:] + episodes_this_iter assert ( len(episodes_for_metrics) - <= self.config["metrics_num_episodes_for_smoothing"] + <= self.config.metrics_num_episodes_for_smoothing ) # Note that when there are more than `metrics_num_episodes_for_smoothing` # episodes in `episodes_for_metrics`, leave them as-is. In this case, we'll @@ -2875,12 +2870,12 @@ def _compile_iteration_results( # needed. self._episode_history.extend(episodes_this_iter) self._episode_history = self._episode_history[ - -self.config["metrics_num_episodes_for_smoothing"] : + -self.config.metrics_num_episodes_for_smoothing: ] results["sampler_results"] = summarize_episodes( episodes_for_metrics, episodes_this_iter, - self.config["keep_per_episode_custom_metrics"], + self.config.keep_per_episode_custom_metrics, ) # TODO: Don't dump sampler results into top-level. results.update(results["sampler_results"]) From 9bdb69050d538277a7617b05ffdd6ecd87f7e18e Mon Sep 17 00:00:00 2001 From: sven1977 Date: Mon, 23 Jan 2023 20:01:56 +0100 Subject: [PATCH 2/3] LINT Signed-off-by: sven1977 --- rllib/algorithms/algorithm.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 4afd6b8639f3d..351316c626dcc 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -844,7 +844,9 @@ def evaluate( self.config.custom_evaluation_function ) ) - metrics = self.config.custom_evaluation_function(self, self.evaluation_workers) + metrics = self.config.custom_evaluation_function( + self, self.evaluation_workers + ) if not metrics or not isinstance(metrics, dict): raise ValueError( "Custom eval function must return " @@ -2870,7 +2872,7 @@ def _compile_iteration_results( # needed. self._episode_history.extend(episodes_this_iter) self._episode_history = self._episode_history[ - -self.config.metrics_num_episodes_for_smoothing: + -self.config.metrics_num_episodes_for_smoothing : ] results["sampler_results"] = summarize_episodes( episodes_for_metrics, From f47ad79a8132573ffb347e950a3fd5b2ce761b0c Mon Sep 17 00:00:00 2001 From: sven1977 Date: Tue, 24 Jan 2023 16:41:48 +0100 Subject: [PATCH 3/3] wip Signed-off-by: sven1977 --- rllib/algorithms/algorithm.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 351316c626dcc..98179cf860cba 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -496,6 +496,15 @@ def setup(self, config: AlgorithmConfig) -> None: # Create the callbacks object. self.callbacks = self.config.callbacks_class() + if self.config.log_level in ["WARN", "ERROR"]: + logger.info( + f"Current log_level is {self.config.log_level}. For more information, " + "set 'log_level': 'INFO' / 'DEBUG' or use the -v and " + "-vv flags." + ) + if self.config.log_level: + logging.getLogger("ray.rllib").setLevel(self.config.log_level) + # Create local replay buffer if necessary. self.local_replay_buffer = self._create_local_replay_buffer_if_necessary( self.config