Skip to content

Commit

Permalink
[gym/common] Add compose reward pipeline wrapper. Support specifying …
Browse files Browse the repository at this point in the history
…reward in pipeline config.
  • Loading branch information
duburcqa committed May 5, 2024
1 parent 182894f commit 948339d
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 36 deletions.
22 changes: 13 additions & 9 deletions python/gym_jiminy/common/gym_jiminy/common/bases/reward.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,48 +271,48 @@ class BaseMixtureReward(AbstractReward):
single one.
"""

rewards: Tuple[AbstractReward, ...]
components: Tuple[AbstractReward, ...]
"""List of all the reward components that must be aggregated together.
"""

def __init__(self,
env: InterfaceJiminyEnv,
name: str,
rewards: Sequence[AbstractReward],
components: Sequence[AbstractReward],
reduce_fn: Callable[
[Sequence[Optional[float]]], Optional[float]],
is_normalized: bool) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param name: Desired name of the total reward.
:param rewards: Sequence of reward components to aggregate.
:param components: Sequence of reward components to aggregate.
:param reduce_fn: Transform function responsible for aggregating all
the reward components that were evaluated. Typical
examples are cumulative product and weighted sum.
:param is_normalized: Whether the reward is guaranteed to be normalized
after applying reduction function `reduce_fn`.
"""
# Make sure that at least one reward component has been specified
if not rewards:
if not components:
raise ValueError(
"At least one reward component must be specified.")

# Make sure that all reward components share the same environment
env = rewards[0].env
for reward in rewards[1:]:
for reward in components:
if env is not reward.env:
raise ValueError(
"All reward components must share the same environment.")

# Backup some user argument(s)
self.rewards = tuple(rewards)
self.components = tuple(components)
self._reduce_fn = reduce_fn
self._is_normalized = is_normalized

# Call base implementation
super().__init__(env, name)

# Determine whether the reward mixture is terminal
is_terminal = {reward.is_terminal for reward in self.rewards}
is_terminal = {reward.is_terminal for reward in self.components}
self._is_terminal: Optional[bool] = None
if len(is_terminal) == 1:
self._is_terminal = next(iter(is_terminal))
Expand All @@ -335,9 +335,13 @@ def compute(self, terminated: bool, info: InfoType) -> Optional[float]:
"""Evaluate each individual reward component for the current state of
the environment, then aggregate them in one.
"""
# Early return depending on whether the reward and state are terminal
if self.is_terminal is not None and self.is_terminal ^ terminated:
return None

# Compute all reward components
values = []
for reward in self.rewards:
for reward in self.components:
# Evaluate reward
reward_info: InfoType = {}
value: Optional[float] = reward(terminated, reward_info)
Expand Down
32 changes: 18 additions & 14 deletions python/gym_jiminy/common/gym_jiminy/common/rewards/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,30 @@ class AdditiveMixtureReward(BaseMixtureReward):
"""

def __init__(self,
env: InterfaceJiminyEnv,
name: str,
rewards: Sequence[AbstractReward],
components: Sequence[AbstractReward],
weights: Optional[Sequence[float]] = None) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param name: Desired name of the total reward.
:param rewards: Sequence of rewards to aggregate.
:param components: Sequence of reward components to aggregate.
:param weights: Sequence of weights associated with each reward
components, with the same ordering as 'rewards'.
components, with the same ordering as 'components'.
Optional: 1.0 for all reward components by default.
"""
# Handling of default arguments
if weights is None:
weights = (1.0,) * len(rewards)
weights = (1.0,) * len(components)

# Make sure that the weight sequence is consistent with the rewards
if len(weights) != len(rewards):
# Make sure that the weight sequence is consistent with the components
if len(weights) != len(components):
raise ValueError(
"Exactly one weight per reward component must be specified.")

# Determine whether the cumulative reward is normalized
weight_total = 0.0
for weight, reward in zip(weights, rewards):
for weight, reward in zip(weights, components):
if not reward.is_normalized:
LOGGER.warning(
"Reward '%s' is not normalized. Aggregating rewards that "
Expand All @@ -99,7 +101,7 @@ def __init__(self,
self.weights = weights

# Call base implementation
super().__init__(name, rewards, self._reduce, is_normalized)
super().__init__(env, name, components, self._reduce, is_normalized)

def _reduce(self, values: Sequence[Optional[float]]) -> Optional[float]:
"""Compute the weighted sum of all the reward components that has been
Expand All @@ -109,7 +111,7 @@ def _reduce(self, values: Sequence[Optional[float]]) -> Optional[float]:
:param values: Sequence of scalar value for reward components that has
been evaluated, `None` otherwise, with the same ordering
as 'rewards'.
as 'components'.
:returns: Scalar value if at least one of the reward component has been
evaluated, `None` otherwise.
Expand Down Expand Up @@ -144,18 +146,20 @@ class MultiplicativeMixtureReward(BaseMixtureReward):
"""

def __init__(self,
env: InterfaceJiminyEnv,
name: str,
rewards: Sequence[AbstractReward]
components: Sequence[AbstractReward]
) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param name: Desired name of the reward.
:param rewards: Sequence of rewards to aggregate.
:param components: Sequence of reward components to aggregate.
"""
# Determine whether the cumulative reward is normalized
is_normalized = all(reward.is_normalized for reward in rewards)
is_normalized = all(reward.is_normalized for reward in components)

# Call base implementation
super().__init__(name, rewards, self._reduce, is_normalized)
super().__init__(env, name, components, self._reduce, is_normalized)

def _reduce(self, values: Sequence[Optional[float]]) -> Optional[float]:
"""Compute the product of all the reward components that has been
Expand All @@ -165,7 +169,7 @@ def _reduce(self, values: Sequence[Optional[float]]) -> Optional[float]:
:param values: Sequence of scalar value for reward components that has
been evaluated, `None` otherwise, with the same ordering
as 'rewards'.
as 'components'.
:returns: Scalar value if at least one of the reward component has been
evaluated, `None` otherwise.
Expand Down
129 changes: 119 additions & 10 deletions python/gym_jiminy/common/gym_jiminy/common/utils/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,73 +23,113 @@
BaseObserverBlock,
BasePipelineWrapper,
ObservedJiminyEnv,
ControlledJiminyEnv)
ControlledJiminyEnv,
AbstractReward,
BaseQuantityReward,
BaseMixtureReward)
from ..wrappers import ComposeReward
from ..envs import BaseJiminyEnv


class RewardConfig(TypedDict, total=False):
""" TODO: Write documentation.
"""

cls: Union[Type[AbstractReward], str]
"""Reward class type.
.. note::
Both class type or fully qualified dotted path are supported.
"""

kwargs: Dict[str, Any]
"""Environment constructor default arguments.
This attribute can be omitted.
"""


class EnvConfig(TypedDict, total=False):
""" TODO: Write documentation.
"""

cls: Union[Type[BaseJiminyEnv], str]
"""Environment class type.
.. note::
Both class type or fully qualified dotted path are supported.
"""
cls: Union[Type[BaseJiminyEnv], str]

kwargs: Dict[str, Any]
"""Environment constructor default arguments.
This attribute can be omitted.
"""
kwargs: Dict[str, Any]

reward: RewardConfig
"""Reward configuration.
This attribute can be omitted.
"""


class BlockConfig(TypedDict, total=False):
""" TODO: Write documentation.
"""

cls: Union[Type[BaseControllerBlock], Type[BaseObserverBlock], str]
"""Block class type. If must derive from `BaseControllerBlock` for
controller blocks or from `BaseObserverBlock` for observer blocks.
.. note::
Both class type or fully qualified dotted path are supported.
"""
cls: Union[Type[BaseControllerBlock], Type[BaseObserverBlock], str]

kwargs: Dict[str, Any]
"""Block constructor default arguments.
This attribute can be omitted.
"""
kwargs: Dict[str, Any]


class WrapperConfig(TypedDict, total=False):
""" TODO: Write documentation.
"""

cls: Union[Type[BasePipelineWrapper], str]
"""Wrapper class type.
.. note::
Both class type or fully qualified dotted path are supported.
"""
cls: Union[Type[BasePipelineWrapper], str]

kwargs: Dict[str, Any]
"""Wrapper constructor default arguments.
This attribute can be omitted.
"""
kwargs: Dict[str, Any]


class LayerConfig(TypedDict, total=False):
"""Block constructor default arguments.
""" TODO: Write documentation.
"""

block: BlockConfig
"""Block configuration.
This attribute can be omitted. If so, then 'wrapper_cls' must be
specified and must not require any block. Typically, it happens when the
wrapper is not doing any computation on its own but just transforming the
action or observation, e.g. stacking observation frames.
"""
block: Optional[BlockConfig]

wrapper: WrapperConfig
"""Wrapper configuration.
This attribute can be omitted. If so, then 'block' must be specified and
must this block must be associated with a unique wrapper type to allow for
automatic type inference. It works with any observer and controller block.
"""
wrapper: WrapperConfig


def build_pipeline(env_config: EnvConfig,
Expand All @@ -107,6 +147,68 @@ def build_pipeline(env_config: EnvConfig,
lowest level layer to the highest, each element corresponding to the
configuration of a individual layer, as a dict of type `LayerConfig`.
"""
# Define helper to build reward if provided
def build_reward(env: InterfaceJiminyEnv,
reward_config: RewardConfig) -> AbstractReward:
"""Instantiate a reward associated with a given environment provided
some reward configuration.
:param env: Base environment or pipeline wrapper to wrap.
:param reward_config: Configuration of the reward, as a dict of type
`RewardConfig`.
"""
# Get reward class type
cls = reward_config["cls"]
if isinstance(cls, str):
obj = locate(cls)
assert (isinstance(obj, type) and
issubclass(obj, AbstractReward))
cls = obj

# Get reward constructor keyword-arguments
kwargs = reward_config.get("kwargs", {})

# Special handling for `BaseMixtureReward`
if issubclass(cls, BaseMixtureReward):
kwargs["components"] = tuple(
build_reward(env, reward_config)
for reward_config in kwargs["components"])

# Special handling for `BaseQuantityReward`
if cls is BaseQuantityReward:
quantity_config = kwargs["quantity"]
kwargs["quantity"] = (
quantity_config["cls"], quantity_config["kwargs"])

return cls(env, **kwargs)

# Define helper to build reward if provided
def build_reward_wrapper(env_creator: Callable[..., InterfaceJiminyEnv],
reward_config: RewardConfig,
**env_kwargs: Any) -> BasePipelineWrapper:
"""Helper adding reward on top of a base environment or a pipeline
using `ComposeReward` wrapper.
:param env_creator: Callable that takes optional keyword arguments as
input and returns an pipeline or base environment.
:param reward_config: Configuration of the reward, as a dict of type
`RewardConfig`.
:param env_kwargs: Keyword arguments to forward to the constructor of
the wrapped environment. Note that it will only
overwrite the default value, so it will still be
possible to set different values by explicitly
defining them when calling the constructor of the
generated wrapper.
"""
# Instantiate the environment, which may be a lower-level wrapper
env = env_creator(**env_kwargs)

# Instantiate the reward
reward = build_reward(env, reward_config)

# Instantiate the wrapper
return ComposeReward(env, reward)

# Define helper to wrap a single layer
def build_layer(env_creator: Callable[..., InterfaceJiminyEnv],
wrapper_cls: Type[BasePipelineWrapper],
Expand Down Expand Up @@ -236,6 +338,13 @@ def build_layer(env_creator: Callable[..., InterfaceJiminyEnv],
block_cls_,
block_kwargs)

# Add extra user-specified reward if any
reward_config = env_config.get("reward")
if reward_config is not None:
pipeline_creator = partial(build_reward_wrapper,
pipeline_creator,
reward_config)

return pipeline_creator


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .observation_stack import StackObservation
from .normalize import NormalizeAction, NormalizeObservation
from .flatten import FlattenAction, FlattenObservation
from .compose import ComposeReward


__all__ = [
Expand All @@ -13,4 +14,5 @@
'NormalizeObservation',
'FlattenAction',
'FlattenObservation',
'ComposeReward',
]
Loading

0 comments on commit 948339d

Please sign in to comment.