diff --git a/README.md b/README.md index 7ae898155..ad745eba9 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ make type ``` pip install -e .[docs] -make docs +make doc ``` Spell check for the documentation: @@ -58,7 +58,7 @@ To cite this repository in publications: ``` @misc{torchy-baselines, - author = {Raffin, Antonin and Dormann, Noah and Hill, Ashley and Ernestus, Maximilian and Gleave, Adam and Kanervisto, Anssi}, + author = {Raffin, Antonin and Hill, Ashley and Ernestus, Maximilian and Gleave, Adam and Kanervisto, Anssi and Dormann, Noah}, title = {Torchy Baselines}, year = {2019}, publisher = {GitHub}, diff --git a/docs/index.rst b/docs/index.rst index 0f17563ca..6ae60c14e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -49,7 +49,7 @@ To cite this project in publications: .. code-block:: bibtex @misc{torchy-baselines, - author = {Raffin, Antonin and Dormann, Noah and Hill, Ashley and Ernestus, Maximilian and Gleave, Adam and Kanervisto, Anssi}, + author = {Raffin, Antonin and Hill, Ashley and Ernestus, Maximilian and Gleave, Adam and Kanervisto, Anssi and Dormann, Noah}, title = {Torchy Baselines}, year = {2019}, publisher = {GitHub}, diff --git a/docs/misc/changelog.rst b/docs/misc/changelog.rst index d0d69c19e..708a83a0f 100644 --- a/docs/misc/changelog.rst +++ b/docs/misc/changelog.rst @@ -3,13 +3,14 @@ Changelog ========== -Pre-Release 0.2.0a0 (WIP) +Pre-Release 0.2.0a1 (WIP) ------------------------------ Breaking Changes: ^^^^^^^^^^^^^^^^^ - Python 2 support was dropped, Torchy Baselines now requires Python 3.6 or above - Return type of `evaluation.evaluate_policy()` has been changed +- Refactored the replay buffer to avoid transformation between PyTorch and NumPy New Features: ^^^^^^^^^^^^^ diff --git a/setup.py b/setup.py index e59810878..b9598fc98 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,9 @@ 'gym[classic_control]>=0.10.9', 'numpy', 'torch>=1.2.0', - 'cloudpickle' + 'cloudpickle', + # For reading logs + 'pandas' ], extras_require={ 'tests': [ @@ -32,8 +34,6 @@ 'extra': [ # For render 'opencv-python', - # For reading logs - 'pandas' ] }, description='Pytorch version of Stable Baselines, implementations of reinforcement learning algorithms.', @@ -45,7 +45,7 @@ license="MIT", long_description="", long_description_content_type='text/markdown', - version="0.2.0a0", + version="0.2.0a1", ) # python setup.py sdist diff --git a/tests/test_monitor.py b/tests/test_monitor.py new file mode 100644 index 000000000..141b2c0d4 --- /dev/null +++ b/tests/test_monitor.py @@ -0,0 +1,85 @@ +import uuid +import json +import os + +import pandas +import gym + +from torchy_baselines.common.monitor import Monitor, get_monitor_files, load_results + + +def test_monitor(): + """ + test the monitor wrapper + """ + env = gym.make("CartPole-v1") + env.seed(0) + monitor_file = "/tmp/stable_baselines-test-{}.monitor.csv".format(uuid.uuid4()) + monitor_env = Monitor(env, monitor_file) + monitor_env.reset() + for _ in range(1000): + _, _, done, _ = monitor_env.step(0) + if done: + monitor_env.reset() + + file_handler = open(monitor_file, 'rt') + + first_line = file_handler.readline() + assert first_line.startswith('#') + metadata = json.loads(first_line[1:]) + assert metadata['env_id'] == "CartPole-v1" + assert set(metadata.keys()) == {'env_id', 't_start'}, "Incorrect keys in monitor metadata" + + last_logline = pandas.read_csv(file_handler, index_col=None) + assert set(last_logline.keys()) == {'l', 't', 'r'}, "Incorrect keys in monitor logline" + file_handler.close() + os.remove(monitor_file) + +def test_monitor_load_results(tmp_path): + """ + test load_results on log files produced by the monitor wrapper + """ + tmp_path = str(tmp_path) + env1 = gym.make("CartPole-v1") + env1.seed(0) + monitor_file1 = os.path.join(tmp_path, "stable_baselines-test-{}.monitor.csv".format(uuid.uuid4())) + monitor_env1 = Monitor(env1, monitor_file1) + + monitor_files = get_monitor_files(tmp_path) + assert len(monitor_files) == 1 + assert monitor_file1 in monitor_files + + monitor_env1.reset() + episode_count1 = 0 + for _ in range(1000): + _, _, done, _ = monitor_env1.step(monitor_env1.action_space.sample()) + if done: + episode_count1 += 1 + monitor_env1.reset() + + results_size1 = len(load_results(os.path.join(tmp_path)).index) + assert results_size1 == episode_count1 + + env2 = gym.make("CartPole-v1") + env2.seed(0) + monitor_file2 = os.path.join(tmp_path, "stable_baselines-test-{}.monitor.csv".format(uuid.uuid4())) + monitor_env2 = Monitor(env2, monitor_file2) + monitor_files = get_monitor_files(tmp_path) + assert len(monitor_files) == 2 + assert monitor_file1 in monitor_files + assert monitor_file2 in monitor_files + + monitor_env2.reset() + episode_count2 = 0 + for _ in range(1000): + _, _, done, _ = monitor_env2.step(monitor_env2.action_space.sample()) + if done: + episode_count2 += 1 + monitor_env2.reset() + + results_size2 = len(load_results(os.path.join(tmp_path)).index) + + assert results_size2 == (results_size1 + episode_count2) + + os.remove(monitor_file1) + os.remove(monitor_file2) diff --git a/torchy_baselines/__init__.py b/torchy_baselines/__init__.py index 90d98dccf..e250cc26d 100644 --- a/torchy_baselines/__init__.py +++ b/torchy_baselines/__init__.py @@ -4,4 +4,4 @@ from torchy_baselines.sac import SAC from torchy_baselines.td3 import TD3 -__version__ = "0.2.0a0" +__version__ = "0.2.0a1" diff --git a/torchy_baselines/a2c/a2c.py b/torchy_baselines/a2c/a2c.py index a4e83bee2..f837e9c16 100644 --- a/torchy_baselines/a2c/a2c.py +++ b/torchy_baselines/a2c/a2c.py @@ -119,8 +119,8 @@ def train(self, gradient_steps, batch_size=None): th.nn.utils.clip_grad_norm_(self.policy.parameters(), self.max_grad_norm) self.policy.optimizer.step() - explained_var = explained_variance(self.rollout_buffer.returns.flatten().cpu().numpy(), - self.rollout_buffer.values.flatten().cpu().numpy()) + explained_var = explained_variance(self.rollout_buffer.returns.flatten(), + self.rollout_buffer.values.flatten()) logger.logkv("explained_variance", explained_var) logger.logkv("entropy", entropy.mean().item()) diff --git a/torchy_baselines/common/buffers.py b/torchy_baselines/common/buffers.py index 369841f2e..39fe24b75 100644 --- a/torchy_baselines/common/buffers.py +++ b/torchy_baselines/common/buffers.py @@ -1,6 +1,10 @@ +from typing import Union, Optional, Tuple, Generator + import numpy as np import torch as th +from torchy_baselines.common.vec_env import VecNormalize + class BaseBuffer(object): """ @@ -9,10 +13,16 @@ class BaseBuffer(object): :param buffer_size: (int) Max number of element in the buffer :param obs_dim: (int) Dimension of the observation :param action_dim: (int) Dimension of the action space - :param device: (th.device) + :param device: (Union[th.device, str]) PyTorch device + to which the values will be converted :param n_envs: (int) Number of parallel environments """ - def __init__(self, buffer_size, obs_dim, action_dim, device='cpu', n_envs=1): + def __init__(self, + buffer_size: int, + obs_dim: int, + action_dim: int, + device: Union[th.device, str] = 'cpu', + n_envs: int = 1): super(BaseBuffer, self).__init__() self.buffer_size = buffer_size self.obs_dim = obs_dim @@ -23,21 +33,21 @@ def __init__(self, buffer_size, obs_dim, action_dim, device='cpu', n_envs=1): self.n_envs = n_envs @staticmethod - def swap_and_flatten(tensor): + def swap_and_flatten(arr: np.ndarray) -> np.ndarray: """ Swap and then flatten axes 0 (buffer_size) and 1 (n_envs) to convert shape from [n_steps, n_envs, ...] (when ... is the shape of the features) to [n_steps * n_envs, ...] (which maintain the order) - :param tensor: (th.Tensor) - :return: (th.Tensor) + :param arr: (np.ndarray) + :return: (np.ndarray) """ - shape = tensor.shape + shape = arr.shape if len(shape) < 3: shape = shape + (1,) - return tensor.transpose(0, 1).reshape(shape[0] * shape[1], *shape[2:]) + return arr.swapaxes(0, 1).reshape(shape[0] * shape[1], *shape[2:]) - def size(self): + def size(self) -> int: """ :return: (int) The current size of the buffer """ @@ -45,55 +55,75 @@ def size(self): return self.buffer_size return self.pos - def add(self, *args, **kwargs): + def add(self, *args, **kwargs) -> None: """ Add elements to the buffer. """ raise NotImplementedError() - def reset(self): + def reset(self) -> None: """ Reset the buffer. """ self.pos = 0 self.full = False - def sample(self, batch_size, env=None): + def sample(self, + batch_size: int, + env: Optional[VecNormalize] = None + ) -> Tuple[th.Tensor, ...]: """ :param batch_size: (int) Number of element to sample - :param env: (VecNormalize) [Optional] associated gym VecEnv + :param env: (Optional[VecNormalize]) associated gym VecEnv to normalize the observations/rewards when sampling """ upper_bound = self.buffer_size if self.full else self.pos - batch_inds = th.LongTensor( - np.random.randint(0, upper_bound, size=batch_size)) + batch_inds = np.random.randint(0, upper_bound, size=batch_size) return self._get_samples(batch_inds, env=env) - def _get_samples(self, batch_inds, env=None): + def _get_samples(self, + batch_inds: np.ndarray, + env: Optional[VecNormalize] = None + ) -> Tuple[th.Tensor, ...]: """ :param batch_inds: (th.Tensor) - :param env: (gym.Env) + :param env: (Optional[VecNormalize]) :return: ([th.Tensor]) """ raise NotImplementedError() + def to_torch(self, array: np.ndarray, copy: bool = True) -> th.Tensor: + """ + Convert a numpy array to a PyTorch tensor. + Note: it copies the data by default + + :param array: (np.ndarray) + :param copy: (bool) Whether to copy or not the data + (may be useful to avoid changing things be reference) + :return: (th.Tensor) + """ + if copy: + return th.tensor(array).to(self.device) + return th.as_tensor(array).to(self.device) + @staticmethod - def _normalize_obs(obs, env=None): + def _normalize_obs(obs: np.ndarray, + env: Optional[VecNormalize] = None) -> np.ndarray: if env is not None: - # TODO: get rid of pytorch - numpy conversion - return th.FloatTensor(env.normalize_obs(obs.numpy())) + return env.normalize_obs(obs).astype(np.float32) return obs - def _normalize_reward(self, reward, env=None): + def _normalize_reward(self, + reward: np.ndarray, + env: Optional[VecNormalize] = None) -> np.ndarray: if env is not None: - return th.FloatTensor(env.normalize_reward(reward.numpy())) + return env.normalize_reward(reward).astype(np.float32) return reward class ReplayBuffer(BaseBuffer): """ Replay buffer used in off-policy algorithms like SAC/TD3. - Adapted from from https://github.com/apourchot/CEM-RL :param buffer_size: (int) Max number of element in the buffer :param obs_dim: (int) Dimension of the observation @@ -101,35 +131,51 @@ class ReplayBuffer(BaseBuffer): :param device: (th.device) :param n_envs: (int) Number of parallel environments """ - def __init__(self, buffer_size, obs_dim, action_dim, device='cpu', n_envs=1): + def __init__(self, + buffer_size: int, + obs_dim: int, + action_dim: int, + device: Union[th.device, str] = 'cpu', + n_envs: int = 1): + super(ReplayBuffer, self).__init__(buffer_size, obs_dim, action_dim, device, n_envs=n_envs) - assert n_envs == 1 - self.observations = th.zeros(self.buffer_size, self.n_envs, self.obs_dim) - self.actions = th.zeros(self.buffer_size, self.n_envs, self.action_dim) - self.next_observations = th.zeros(self.buffer_size, self.n_envs, self.obs_dim) - self.rewards = th.zeros(self.buffer_size, self.n_envs) - self.dones = th.zeros(self.buffer_size, self.n_envs) + assert n_envs == 1, "Replay buffer only support single environment for now" - def add(self, obs, next_obs, action, reward, done): + self.observations = np.zeros((self.buffer_size, self.n_envs, self.obs_dim), dtype=np.float32) + self.actions = np.zeros((self.buffer_size, self.n_envs, self.action_dim), dtype=np.float32) + self.next_observations = np.zeros((self.buffer_size, self.n_envs, self.obs_dim), dtype=np.float32) + self.rewards = np.zeros((self.buffer_size, self.n_envs), dtype=np.float32) + self.dones = np.zeros((self.buffer_size, self.n_envs), dtype=np.float32) + + def add(self, + obs: np.ndarray, + next_obs: np.ndarray, + action: np.ndarray, + reward: np.ndarray, + done: np.ndarray) -> None: # Copy to avoid modification by reference - self.observations[self.pos] = th.FloatTensor(np.array(obs).copy()) - self.next_observations[self.pos] = th.FloatTensor(np.array(next_obs).copy()) - self.actions[self.pos] = th.FloatTensor(np.array(action).copy()) - self.rewards[self.pos] = th.FloatTensor(np.array(reward).copy()) - self.dones[self.pos] = th.FloatTensor(np.array(done).copy()) + self.observations[self.pos] = np.array(obs).copy() + self.next_observations[self.pos] = np.array(next_obs).copy() + self.actions[self.pos] = np.array(action).copy() + self.rewards[self.pos] = np.array(reward).copy() + self.dones[self.pos] = np.array(done).copy() self.pos += 1 if self.pos == self.buffer_size: self.full = True self.pos = 0 - def _get_samples(self, batch_inds, env=None): - return (self._normalize_obs(self.observations[batch_inds, 0, :], env).to(self.device), - self.actions[batch_inds, 0, :].to(self.device), - self._normalize_obs(self.next_observations[batch_inds, 0, :], env).to(self.device), - self.dones[batch_inds].to(self.device), - self._normalize_reward(self.rewards[batch_inds], env).to(self.device)) + def _get_samples(self, + batch_inds: np.ndarray, + env: Optional[VecNormalize] = None + ) -> Tuple[th.Tensor, ...]: + data = (self._normalize_obs(self.observations[batch_inds, 0, :], env), + self.actions[batch_inds, 0, :], + self._normalize_obs(self.next_observations[batch_inds, 0, :], env), + self.dones[batch_inds], + self._normalize_reward(self.rewards[batch_inds], env)) + return tuple(map(self.to_torch, data)) class RolloutBuffer(BaseBuffer): @@ -145,10 +191,16 @@ class RolloutBuffer(BaseBuffer): :param gamma: (float) Discount factor :param n_envs: (int) Number of parallel environments """ - def __init__(self, buffer_size, obs_dim, action_dim, device='cpu', - gae_lambda=1, gamma=0.99, n_envs=1): + def __init__(self, + buffer_size: int, + obs_dim: int, + action_dim: int, + device: Union[th.device, str] = 'cpu', + gae_lambda: float = 1, + gamma: float = 0.99, + n_envs: int = 1): + super(RolloutBuffer, self).__init__(buffer_size, obs_dim, action_dim, device, n_envs=n_envs) - # TODO: try the buffer on the gpu? self.gae_lambda = gae_lambda self.gamma = gamma self.observations, self.actions, self.rewards, self.advantages = None, None, None, None @@ -156,35 +208,41 @@ def __init__(self, buffer_size, obs_dim, action_dim, device='cpu', self.generator_ready = False self.reset() - def reset(self): - self.observations = th.zeros(self.buffer_size, self.n_envs, self.obs_dim) - self.actions = th.zeros(self.buffer_size, self.n_envs, self.action_dim) - self.rewards = th.zeros(self.buffer_size, self.n_envs) - self.returns = th.zeros(self.buffer_size, self.n_envs) - self.dones = th.zeros(self.buffer_size, self.n_envs) - self.values = th.zeros(self.buffer_size, self.n_envs) - self.log_probs = th.zeros(self.buffer_size, self.n_envs) - self.advantages = th.zeros(self.buffer_size, self.n_envs) + def reset(self) -> None: + self.observations = np.zeros((self.buffer_size, self.n_envs, self.obs_dim), dtype=np.float32) + self.actions = np.zeros((self.buffer_size, self.n_envs, self.action_dim), dtype=np.float32) + self.rewards = np.zeros((self.buffer_size, self.n_envs), dtype=np.float32) + self.returns = np.zeros((self.buffer_size, self.n_envs), dtype=np.float32) + self.dones = np.zeros((self.buffer_size, self.n_envs), dtype=np.float32) + self.values = np.zeros((self.buffer_size, self.n_envs), dtype=np.float32) + self.log_probs = np.zeros((self.buffer_size, self.n_envs), dtype=np.float32) + self.advantages = np.zeros((self.buffer_size, self.n_envs), dtype=np.float32) self.generator_ready = False super(RolloutBuffer, self).reset() - def compute_returns_and_advantage(self, last_value, dones=False, use_gae=True): + def compute_returns_and_advantage(self, + last_value: th.Tensor, + dones: np.ndarray, + use_gae: bool = True) -> None: """ Post-processing step: compute the returns (sum of discounted rewards) and advantage (A(s) = R - V(S)). Adapted from Stable-Baselines PPO2. :param last_value: (th.Tensor) - :param dones: ([bool]) + :param dones: (np.ndarray) :param use_gae: (bool) Whether to use Generalized Advantage Estimation or normal advantage for advantage computation. """ + # convert to numpy + last_value = last_value.clone().cpu().numpy().flatten() + if use_gae: last_gae_lam = 0 for step in reversed(range(self.buffer_size)): if step == self.buffer_size - 1: - next_non_terminal = th.FloatTensor(1.0 - dones) - next_value = last_value.clone().cpu().flatten() + next_non_terminal = 1.0 - dones + next_value = last_value else: next_non_terminal = 1.0 - self.dones[step + 1] next_value = self.values[step + 1] @@ -199,8 +257,8 @@ def compute_returns_and_advantage(self, last_value, dones=False, use_gae=True): last_return = 0.0 for step in reversed(range(self.buffer_size)): if step == self.buffer_size - 1: - next_non_terminal = th.FloatTensor(1.0 - dones) - next_value = last_value.clone().cpu().flatten() + next_non_terminal = 1.0 - dones + next_value = last_value last_return = self.rewards[step] + next_non_terminal * next_value else: next_non_terminal = 1.0 - self.dones[step + 1] @@ -208,7 +266,13 @@ def compute_returns_and_advantage(self, last_value, dones=False, use_gae=True): self.returns[step] = last_return self.advantages = self.returns - self.values - def add(self, obs, action, reward, done, value, log_prob): + def add(self, + obs: np.ndarray, + action: np.ndarray, + reward: np.ndarray, + done: np.ndarray, + value: th.Tensor, + log_prob: th.Tensor) -> None: """ :param obs: (np.ndarray) Observation :param action: (np.ndarray) Action @@ -223,19 +287,19 @@ def add(self, obs, action, reward, done, value, log_prob): # Reshape 0-d tensor to avoid error log_prob = log_prob.reshape(-1, 1) - self.observations[self.pos] = th.FloatTensor(np.array(obs).copy()) - self.actions[self.pos] = th.FloatTensor(np.array(action).copy()) - self.rewards[self.pos] = th.FloatTensor(np.array(reward).copy()) - self.dones[self.pos] = th.FloatTensor(np.array(done).copy()) - self.values[self.pos] = th.FloatTensor(value.clone().cpu().flatten()) - self.log_probs[self.pos] = th.FloatTensor(log_prob.cpu().clone()) + self.observations[self.pos] = np.array(obs).copy() + self.actions[self.pos] = np.array(action).copy() + self.rewards[self.pos] = np.array(reward).copy() + self.dones[self.pos] = np.array(done).copy() + self.values[self.pos] = value.clone().cpu().numpy().flatten() + self.log_probs[self.pos] = log_prob.clone().cpu().numpy() self.pos += 1 if self.pos == self.buffer_size: self.full = True - def get(self, batch_size=None): - assert self.full - indices = th.randperm(self.buffer_size * self.n_envs) + def get(self, batch_size: Optional[int] = None) -> Generator[Tuple[th.Tensor, ...], None, None]: + assert self.full, '' + indices = np.random.permutation(self.buffer_size * self.n_envs) # Prepare the data if not self.generator_ready: for tensor in ['observations', 'actions', 'values', @@ -252,10 +316,12 @@ def get(self, batch_size=None): yield self._get_samples(indices[start_idx:start_idx + batch_size]) start_idx += batch_size - def _get_samples(self, batch_inds, env=None): - return (self.observations[batch_inds].to(self.device), - self.actions[batch_inds].to(self.device), - self.values[batch_inds].flatten().to(self.device), - self.log_probs[batch_inds].flatten().to(self.device), - self.advantages[batch_inds].flatten().to(self.device), - self.returns[batch_inds].flatten().to(self.device)) + def _get_samples(self, batch_inds: np.ndarray, + env: Optional[VecNormalize] = None) -> Tuple[th.Tensor, ...]: + data = (self.observations[batch_inds], + self.actions[batch_inds], + self.values[batch_inds].flatten(), + self.log_probs[batch_inds].flatten(), + self.advantages[batch_inds].flatten(), + self.returns[batch_inds].flatten()) + return tuple(map(self.to_torch, data)) diff --git a/torchy_baselines/common/monitor.py b/torchy_baselines/common/monitor.py index 53bea70ba..8d460d276 100644 --- a/torchy_baselines/common/monitor.py +++ b/torchy_baselines/common/monitor.py @@ -1,29 +1,37 @@ -""" -Taken from stable-baselines -""" +__all__ = ['Monitor', 'get_monitor_files', 'load_results'] + import csv import json import os import time +from glob import glob +from typing import Tuple, Dict, Any, List, Optional -from gym.core import Wrapper +import gym +import pandas +import numpy as np -class Monitor(Wrapper): +class Monitor(gym.Wrapper): EXT = "monitor.csv" file_handler = None - def __init__(self, env, filename=None, allow_early_resets=True, reset_keywords=(), info_keywords=()): + def __init__(self, + env: gym.Env, + filename: Optional[str], + allow_early_resets: bool = True, + reset_keywords=(), + info_keywords=()): """ A monitor wrapper for Gym environments, it is used to know the episode reward, length, time and other data. - :param env: (Gym environment) The environment - :param filename: (str) the location to save a log file, can be None for no log + :param env: (gym.Env) The environment + :param filename: (Optional[str]) the location to save a log file, can be None for no log :param allow_early_resets: (bool) allows the reset of the environment before it is done :param reset_keywords: (tuple) extra keywords for the reset call, if extra parameters are needed at reset :param info_keywords: (tuple) extra information to log, from the information return of environment.step """ - Wrapper.__init__(self, env=env) + super(Monitor, self).__init__(env=env) self.t_start = time.time() if filename is None: self.file_handler = None @@ -52,12 +60,12 @@ def __init__(self, env, filename=None, allow_early_resets=True, reset_keywords=( self.total_steps = 0 self.current_reset_info = {} # extra info about the current episode, that was passed in during reset() - def reset(self, **kwargs): + def reset(self, **kwargs) -> np.ndarray: """ Calls the Gym environment reset. Can only be called if the environment is over, or if allow_early_resets is True :param kwargs: Extra keywords saved for the next episode. only if defined by reset_keywords - :return: ([int] or [float]) the first observation of the environment + :return: (np.ndarray) the first observation of the environment """ if not self.allow_early_resets and not self.needs_reset: raise RuntimeError("Tried to reset an environment before done. If you want to allow early resets, " @@ -67,16 +75,16 @@ def reset(self, **kwargs): for key in self.reset_keywords: value = kwargs.get(key) if value is None: - raise ValueError('Expected you to pass kwarg %s into reset' % key) + raise ValueError('Expected you to pass kwarg {} into reset'.format(key)) self.current_reset_info[key] = value return self.env.reset(**kwargs) - def step(self, action): + def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, Dict[Any, Any]]: """ Step the environment with the given action - :param action: ([int] or [float]) the action - :return: ([int] or [float], [float], [bool], dict) observation, reward, done, information + :param action: (np.ndarray) the action + :return: (Tuple[np.ndarray, float, bool, Dict[Any, Any]]) observation, reward, done, information """ if self.needs_reset: raise RuntimeError("Tried to step environment that needs reset") @@ -104,10 +112,11 @@ def close(self): """ Closes the environment """ + super(Monitor, self).close() if self.file_handler is not None: self.file_handler.close() - def get_total_steps(self): + def get_total_steps(self) -> int: """ Returns the total number of timesteps @@ -115,7 +124,7 @@ def get_total_steps(self): """ return self.total_steps - def get_episode_rewards(self): + def get_episode_rewards(self) -> List[float]: """ Returns the rewards of all the episodes @@ -123,7 +132,7 @@ def get_episode_rewards(self): """ return self.episode_rewards - def get_episode_lengths(self): + def get_episode_lengths(self) -> List[int]: """ Returns the number of timesteps of all the episodes @@ -131,10 +140,69 @@ def get_episode_lengths(self): """ return self.episode_lengths - def get_episode_times(self): + def get_episode_times(self) -> List[float]: """ Returns the runtime in seconds of all the episodes :return: ([float]) """ return self.episode_times + + +class LoadMonitorResultsError(Exception): + """ + Raised when loading the monitor log fails. + """ + pass + + +def get_monitor_files(path: str) -> List[str]: + """ + get all the monitor files in the given path + + :param path: (str) the logging folder + :return: ([str]) the log files + """ + return glob(os.path.join(path, "*" + Monitor.EXT)) + + +def load_results(path: str) -> pandas.DataFrame: + """ + Load all Monitor logs from a given directory path matching ``*monitor.csv`` and ``*monitor.json`` + + :param path: (str) the directory path containing the log file(s) + :return: (pandas.DataFrame) the logged data + """ + # get both csv and (old) json files + monitor_files = (glob(os.path.join(path, "*monitor.json")) + get_monitor_files(path)) + if not monitor_files: + raise LoadMonitorResultsError("no monitor files of the form *%s found in %s" % (Monitor.EXT, path)) + data_frames = [] + headers = [] + for file_name in monitor_files: + with open(file_name, 'rt') as file_handler: + if file_name.endswith('csv'): + first_line = file_handler.readline() + assert first_line[0] == '#' + header = json.loads(first_line[1:]) + data_frame = pandas.read_csv(file_handler, index_col=None) + headers.append(header) + elif file_name.endswith('json'): # Deprecated json format + episodes = [] + lines = file_handler.readlines() + header = json.loads(lines[0]) + headers.append(header) + for line in lines[1:]: + episode = json.loads(line) + episodes.append(episode) + data_frame = pandas.DataFrame(episodes) + else: + assert 0, 'unreachable' + data_frame['t'] += header['t_start'] + data_frames.append(data_frame) + data_frame = pandas.concat(data_frames) + data_frame.sort_values('t', inplace=True) + data_frame.reset_index(inplace=True) + data_frame['t'] -= min(header['t_start'] for header in headers) + # data_frame.headers = headers # HACK to preserve backwards compatibility + return data_frame diff --git a/torchy_baselines/ppo/ppo.py b/torchy_baselines/ppo/ppo.py index c5916eab1..58c799f4f 100644 --- a/torchy_baselines/ppo/ppo.py +++ b/torchy_baselines/ppo/ppo.py @@ -275,8 +275,8 @@ def train(self, gradient_steps, batch_size=64): np.mean(approx_kl_divs))) break - explained_var = explained_variance(self.rollout_buffer.returns.flatten().cpu().numpy(), - self.rollout_buffer.values.flatten().cpu().numpy()) + explained_var = explained_variance(self.rollout_buffer.returns.flatten(), + self.rollout_buffer.values.flatten()) logger.logkv("explained_variance", explained_var) # TODO: gather stats for the entropy and other losses?