diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 6b9921225fb2..98179cf860cb 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -493,16 +493,17 @@ def setup(self, config: AlgorithmConfig) -> None: self._record_usage(self.config) - self.callbacks = self.config["callbacks"]() - log_level = self.config.get("log_level") - if log_level in ["WARN", "ERROR"]: + # Create the callbacks object. + self.callbacks = self.config.callbacks_class() + + if self.config.log_level in ["WARN", "ERROR"]: logger.info( - "Current log_level is {}. For more information, " + f"Current log_level is {self.config.log_level}. For more information, " "set 'log_level': 'INFO' / 'DEBUG' or use the -v and " - "-vv flags.".format(log_level) + "-vv flags." ) - if self.config.get("log_level"): - logging.getLogger("ray.rllib").setLevel(self.config["log_level"]) + if self.config.log_level: + logging.getLogger("ray.rllib").setLevel(self.config.log_level) # Create local replay buffer if necessary. self.local_replay_buffer = self._create_local_replay_buffer_if_necessary( @@ -532,7 +533,7 @@ def setup(self, config: AlgorithmConfig) -> None: error=True, help="Running OPE during training is not recommended.", ) - self.config["off_policy_estimation_methods"] = ope_dict + self.config.off_policy_estimation_methods = ope_dict # Deprecated way of implementing Trainer sub-classes (or "templates" # via the `build_trainer` utility function). @@ -566,14 +567,14 @@ def setup(self, config: AlgorithmConfig) -> None: validate_env=self.validate_env, default_policy_class=self.get_default_policy_class(self.config), config=self.config, - num_workers=self.config["num_workers"], + num_workers=self.config.num_rollout_workers, local_worker=True, logdir=self.logdir, ) # TODO (avnishn): Remove the execution plan API by q1 2023 # Function defining one single training iteration's behavior. - if self.config["_disable_execution_plan_api"]: + if self.config._disable_execution_plan_api: # Ensure remote workers are initially in sync with the local worker. self.workers.sync_weights() # LocalIterator-creating "execution plan". @@ -614,13 +615,13 @@ def setup(self, config: AlgorithmConfig) -> None: validate_env=None, default_policy_class=self.get_default_policy_class(self.config), config=self.evaluation_config, - num_workers=self.config["evaluation_num_workers"], + num_workers=self.config.evaluation_num_workers, # Don't even create a local worker if num_workers > 0. local_worker=False, logdir=self.logdir, ) - if self.config["enable_async_evaluation"]: + if self.config.enable_async_evaluation: self._evaluation_weights_seq_number = 0 self.evaluation_dataset = None @@ -650,7 +651,7 @@ def setup(self, config: AlgorithmConfig) -> None: "dm": DirectMethod, "dr": DoublyRobust, } - for name, method_config in self.config["off_policy_estimation_methods"].items(): + for name, method_config in self.config.off_policy_estimation_methods.items(): method_type = method_config.pop("type") if method_type in ope_types: deprecation_warning( @@ -671,7 +672,7 @@ def setup(self, config: AlgorithmConfig) -> None: # offline evaluators. policy = self.get_policy() if issubclass(method_type, OffPolicyEstimator): - method_config["gamma"] = self.config["gamma"] + method_config["gamma"] = self.config.gamma self.reward_estimators[name] = method_type(policy, **method_config) else: raise ValueError( @@ -748,7 +749,7 @@ def step(self) -> ResultDict: results: ResultDict = {} # Parallel eval + training: Kick off evaluation-loop and parallel train() call. - if evaluate_this_iter and self.config["evaluation_parallel_to_training"]: + if evaluate_this_iter and self.config.evaluation_parallel_to_training: ( results, train_iter_ctx, @@ -760,12 +761,12 @@ def step(self) -> ResultDict: results, train_iter_ctx = self._run_one_training_iteration() # Sequential: Train (already done above), then evaluate. - if evaluate_this_iter and not self.config["evaluation_parallel_to_training"]: + if evaluate_this_iter and not self.config.evaluation_parallel_to_training: results.update(self._run_one_evaluation(train_future=None)) # Attach latest available evaluation results to train results, # if necessary. - if not evaluate_this_iter and self.config["always_attach_evaluation_results"]: + if not evaluate_this_iter and self.config.always_attach_evaluation_results: assert isinstance( self.evaluation_metrics, dict ), "Trainer.evaluate() needs to return a dict." @@ -776,17 +777,15 @@ def step(self) -> ResultDict: self._sync_filters_if_needed( from_worker=self.workers.local_worker(), workers=self.workers, - timeout_seconds=self.config[ - "sync_filters_on_rollout_workers_timeout_s" - ], + timeout_seconds=self.config.sync_filters_on_rollout_workers_timeout_s, ) # TODO (avnishn): Remove the execution plan API by q1 2023 # Collect worker metrics and add combine them with `results`. - if self.config["_disable_execution_plan_api"]: + if self.config._disable_execution_plan_api: episodes_this_iter = collect_episodes( self.workers, self._remote_worker_ids_for_metrics(), - timeout_seconds=self.config["metrics_episode_collection_timeout_s"], + timeout_seconds=self.config.metrics_episode_collection_timeout_s, ) results = self._compile_iteration_results( episodes_this_iter=episodes_this_iter, @@ -795,8 +794,8 @@ def step(self) -> ResultDict: ) # Check `env_task_fn` for possible update of the env's task. - if self.config["env_task_fn"] is not None: - if not callable(self.config["env_task_fn"]): + if self.config.env_task_fn is not None: + if not callable(self.config.env_task_fn): raise ValueError( "`env_task_fn` must be None or a callable taking " "[train_results, env, env_ctx] as args!" @@ -808,7 +807,7 @@ def fn(env, env_context, task_fn): if cur_task != new_task: env.set_task(new_task) - fn = functools.partial(fn, task_fn=self.config["env_task_fn"]) + fn = functools.partial(fn, task_fn=self.config.env_task_fn) self.workers.foreach_env_with_context(fn) return results @@ -843,20 +842,20 @@ def evaluate( self._sync_filters_if_needed( from_worker=self.workers.local_worker(), workers=self.evaluation_workers, - timeout_seconds=self.config[ - "sync_filters_on_rollout_workers_timeout_s" - ], + timeout_seconds=self.config.sync_filters_on_rollout_workers_timeout_s, ) self.callbacks.on_evaluate_start(algorithm=self) - if self.config["custom_eval_function"]: + if self.config.custom_evaluation_function: logger.info( "Running custom eval function {}".format( - self.config["custom_eval_function"] + self.config.custom_evaluation_function ) ) - metrics = self.config["custom_eval_function"](self, self.evaluation_workers) + metrics = self.config.custom_evaluation_function( + self, self.evaluation_workers + ) if not metrics or not isinstance(metrics, dict): raise ValueError( "Custom eval function must return " @@ -881,15 +880,15 @@ def evaluate( # How many episodes/timesteps do we need to run? # In "auto" mode (only for parallel eval + training): Run as long # as training lasts. - unit = self.config["evaluation_duration_unit"] + unit = self.config.evaluation_duration_unit eval_cfg = self.evaluation_config - rollout = eval_cfg["rollout_fragment_length"] - num_envs = eval_cfg["num_envs_per_worker"] - auto = self.config["evaluation_duration"] == "auto" + rollout = eval_cfg.rollout_fragment_length + num_envs = eval_cfg.num_envs_per_worker + auto = self.config.evaluation_duration == "auto" duration = ( - self.config["evaluation_duration"] + self.config.evaluation_duration if not auto - else (self.config["evaluation_num_workers"] or 1) + else (self.config.evaluation_num_workers or 1) * (1 if unit == "episodes" else rollout) ) agent_steps_this_iter = 0 @@ -902,7 +901,7 @@ def evaluate( def duration_fn(num_units_done): return duration - num_units_done - logger.info(f"Evaluating current policy for {duration} {unit}.") + logger.info(f"Evaluating current state of {self} for {duration} {unit}.") metrics = None all_batches = [] @@ -923,8 +922,8 @@ def duration_fn(num_units_done): all_batches.append(batch) metrics = collect_metrics( self.workers, - keep_custom_metrics=eval_cfg["keep_per_episode_custom_metrics"], - timeout_seconds=eval_cfg["metrics_episode_collection_timeout_s"], + keep_custom_metrics=eval_cfg.keep_per_episode_custom_metrics, + timeout_seconds=eval_cfg.metrics_episode_collection_timeout_s, ) # Evaluation worker set only has local worker. @@ -969,13 +968,13 @@ def duration_fn(num_units_done): func=lambda w: w.sample(), local_worker=False, remote_worker_ids=selected_eval_worker_ids, - timeout_seconds=self.config["evaluation_sample_timeout_s"], + timeout_seconds=self.config.evaluation_sample_timeout_s, ) if len(batches) != len(selected_eval_worker_ids): logger.warning( "Calling `sample()` on your remote evaluation worker(s) " "resulted in a timeout (after the configured " - f"{self.config['evaluation_sample_timeout_s']} seconds)! " + f"{self.config.evaluation_sample_timeout_s} seconds)! " "Try to set `evaluation_sample_timeout_s` in your config" " to a larger value." + ( @@ -1014,7 +1013,7 @@ def duration_fn(num_units_done): env_steps_this_iter += _env_steps logger.info( - f"Ran round {_round} of parallel evaluation " + f"Ran round {_round} of non-parallel evaluation " f"({num_units_done}/{duration if not auto else '?'} " f"{unit} done)" ) @@ -1026,8 +1025,8 @@ def duration_fn(num_units_done): if metrics is None: metrics = collect_metrics( self.evaluation_workers, - keep_custom_metrics=self.config["keep_per_episode_custom_metrics"], - timeout_seconds=eval_cfg["metrics_episode_collection_timeout_s"], + keep_custom_metrics=self.config.keep_per_episode_custom_metrics, + timeout_seconds=eval_cfg.metrics_episode_collection_timeout_s, ) metrics[NUM_AGENT_STEPS_SAMPLED_THIS_ITER] = agent_steps_this_iter metrics[NUM_ENV_STEPS_SAMPLED_THIS_ITER] = env_steps_this_iter @@ -1041,9 +1040,7 @@ def duration_fn(num_units_done): for batch in all_batches: estimate_result = estimator.estimate( batch, - split_batch_by_episode=self.config[ - "ope_split_batch_by_episode" - ], + split_batch_by_episode=self.config.ope_split_batch_by_episode, ) estimates[name].append(estimate_result) @@ -1093,15 +1090,15 @@ def _evaluate_async( # How many episodes/timesteps do we need to run? # In "auto" mode (only for parallel eval + training): Run as long # as training lasts. - unit = self.config["evaluation_duration_unit"] + unit = self.config.evaluation_duration_unit eval_cfg = self.evaluation_config - rollout = eval_cfg["rollout_fragment_length"] - num_envs = eval_cfg["num_envs_per_worker"] - auto = self.config["evaluation_duration"] == "auto" + rollout = eval_cfg.rollout_fragment_length + num_envs = eval_cfg.num_envs_per_worker + auto = self.config.evaluation_duration == "auto" duration = ( - self.config["evaluation_duration"] + self.config.evaluation_duration if not auto - else (self.config["evaluation_num_workers"] or 1) + else (self.config.evaluation_num_workers or 1) * (1 if unit == "episodes" else rollout) ) @@ -1112,17 +1109,17 @@ def _evaluate_async( self._sync_filters_if_needed( from_worker=self.workers.local_worker(), workers=self.evaluation_workers, - timeout_seconds=eval_cfg.get("sync_filters_on_rollout_workers_timeout_s"), + timeout_seconds=eval_cfg.sync_filters_on_rollout_workers_timeout_s, ) - if self.config["custom_eval_function"]: + if self.config.custom_evaluation_function: raise ValueError( - "`custom_eval_function` not supported in combination " + "`config.custom_evaluation_function` not supported in combination " "with `enable_async_evaluation=True` config setting!" ) if self.evaluation_workers is None and ( self.workers.local_worker().input_reader is None - or self.config["evaluation_num_workers"] == 0 + or self.config.evaluation_num_workers == 0 ): raise ValueError( "Evaluation w/o eval workers (calling Algorithm.evaluate() w/o " @@ -1135,7 +1132,7 @@ def _evaluate_async( agent_steps_this_iter = 0 env_steps_this_iter = 0 - logger.info(f"Evaluating current policy for {duration} {unit}.") + logger.info(f"Evaluating current state of {self} for {duration} {unit}.") all_batches = [] @@ -1319,11 +1316,11 @@ def training_step(self) -> ResultDict: # Collect SampleBatches from sample workers until we have a full batch. if self.config.count_steps_by == "agent_steps": train_batch = synchronous_parallel_sample( - worker_set=self.workers, max_agent_steps=self.config["train_batch_size"] + worker_set=self.workers, max_agent_steps=self.config.train_batch_size ) else: train_batch = synchronous_parallel_sample( - worker_set=self.workers, max_env_steps=self.config["train_batch_size"] + worker_set=self.workers, max_env_steps=self.config.train_batch_size ) train_batch = train_batch.as_multi_agent() self._counters[NUM_AGENT_STEPS_SAMPLED] += train_batch.agent_steps() @@ -1426,17 +1423,17 @@ def compute_single_action( full_fetch: Whether to return extra action fetch results. This is always set to True if `state` is specified. explore: Whether to apply exploration to the action. - Default: None -> use self.config["explore"]. + Default: None -> use self.config.explore. timestep: The current (sampling) time step. episode: This provides access to all of the internal episodes' state, which may be useful for model-based or multi-agent algorithms. unsquash_action: Should actions be unsquashed according to the env's/Policy's action space? If None, use the value of - self.config["normalize_actions"]. + self.config.normalize_actions. clip_action: Should actions be clipped according to the env's/Policy's action space? If None, use the value of - self.config["clip_actions"]. + self.config.clip_actions. Keyword Args: kwargs: forward compatibility placeholder @@ -1467,10 +1464,10 @@ def compute_single_action( # `unsquash_action` is None: Use value of config['normalize_actions']. if unsquash_action is None: - unsquash_action = self.config["normalize_actions"] + unsquash_action = self.config.normalize_actions # `clip_action` is None: Use value of config['clip_actions']. elif clip_action is None: - clip_action = self.config["clip_actions"] + clip_action = self.config.clip_actions # User provided an input-dict: Assert that `obs`, `prev_a|r`, `state` # are all None. @@ -1621,17 +1618,17 @@ def compute_actions( full_fetch: Whether to return extra action fetch results. This is always set to True if RNN state is specified. explore: Whether to pick an exploitation or exploration - action (default: None -> use self.config["explore"]). + action (default: None -> use self.config.explore). timestep: The current (sampling) time step. episodes: This provides access to all of the internal episodes' state, which may be useful for model-based or multi-agent algorithms. unsquash_actions: Should actions be unsquashed according to the env's/Policy's action space? If None, use - self.config["normalize_actions"]. + self.config.normalize_actions. clip_actions: Should actions be clipped according to the env's/Policy's action space? If None, use - self.config["clip_actions"]. + self.config.clip_actions. Keyword Args: kwargs: forward compatibility placeholder @@ -1651,10 +1648,10 @@ def compute_actions( # `unsquash_actions` is None: Use value of config['normalize_actions']. if unsquash_actions is None: - unsquash_actions = self.config["normalize_actions"] + unsquash_actions = self.config.normalize_actions # `clip_actions` is None: Use value of config['clip_actions']. elif clip_actions is None: - clip_actions = self.config["clip_actions"] + clip_actions = self.config.clip_actions # Preprocess obs and states. state_defined = state is not None @@ -2268,7 +2265,7 @@ def _sync_filters_if_needed( FilterManager.synchronize( from_worker.filters, workers, - update_remote=self.config["synchronize_filters"], + update_remote=self.config.synchronize_filters, timeout_seconds=timeout_seconds, ) logger.debug("synchronized filters: {}".format(from_worker.filters)) @@ -2812,11 +2809,11 @@ def _should_create_evaluation_rollout_workers(cls, eval_config: "AlgorithmConfig Returns False if we need to run offline evaluation (with ope.estimate_on_dastaset API) or when local worker is to be used for evaluation. Note: We only use estimate_on_dataset API with bandits for now. - That is when ope_split_batch_by_episode is False. TODO: In future we will do - the same for episodic RL OPE. + That is when ope_split_batch_by_episode is False. + TODO: In future we will do the same for episodic RL OPE. """ run_offline_evaluation = ( - eval_config.get("off_policy_estimation_methods") + eval_config.off_policy_estimation_methods and not eval_config.ope_split_batch_by_episode ) return not run_offline_evaluation and ( @@ -2866,7 +2863,7 @@ def _compile_iteration_results( # Calculate how many (if any) of older, historical episodes we have to add to # `episodes_this_iter` in order to reach the required smoothing window. episodes_for_metrics = episodes_this_iter[:] - missing = self.config["metrics_num_episodes_for_smoothing"] - len( + missing = self.config.metrics_num_episodes_for_smoothing - len( episodes_this_iter ) # We have to add some older episodes to reach the smoothing window size. @@ -2874,7 +2871,7 @@ def _compile_iteration_results( episodes_for_metrics = self._episode_history[-missing:] + episodes_this_iter assert ( len(episodes_for_metrics) - <= self.config["metrics_num_episodes_for_smoothing"] + <= self.config.metrics_num_episodes_for_smoothing ) # Note that when there are more than `metrics_num_episodes_for_smoothing` # episodes in `episodes_for_metrics`, leave them as-is. In this case, we'll @@ -2884,12 +2881,12 @@ def _compile_iteration_results( # needed. self._episode_history.extend(episodes_this_iter) self._episode_history = self._episode_history[ - -self.config["metrics_num_episodes_for_smoothing"] : + -self.config.metrics_num_episodes_for_smoothing : ] results["sampler_results"] = summarize_episodes( episodes_for_metrics, episodes_this_iter, - self.config["keep_per_episode_custom_metrics"], + self.config.keep_per_episode_custom_metrics, ) # TODO: Don't dump sampler results into top-level. results.update(results["sampler_results"])