Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gym_jiminy/common] Add 'compute_rpy' optional argument to quaternion observers. #837

Merged
merged 4 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:
jobs:
build-and-test-osx:
name: >-
(${{ matrix.PYTHON_VERSION }}) (${{ matrix.BUILD_TYPE }})
(${{ matrix.OS }}) (${{ matrix.PYTHON_VERSION }}) (${{ matrix.BUILD_TYPE }})
Build and run the unit tests. Then generate and publish the wheels on PyPi.

strategy:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Implementation of a stateless kinematic deformation estimator block
compatible with gym_jiminy reinforcement learning pipeline environment design.
"""
from collections import OrderedDict
from collections.abc import Mapping
from typing import List, Dict, Sequence, Tuple, Optional

Expand All @@ -16,6 +17,7 @@

from ..bases import BaseAct, BaseObs, BaseObserverBlock, InterfaceJiminyEnv
from ..utils import (DataNested,
quat_to_rpy,
matrices_to_quat,
quat_multiply,
compute_tilt_from_quat,
Expand Down Expand Up @@ -414,8 +416,8 @@ def get_flexibility_imu_frame_chains(
return flex_imu_name_chains


class DeformationEstimator(
BaseObserverBlock[np.ndarray, np.ndarray, BaseObs, BaseAct]):
class DeformationEstimator(BaseObserverBlock[
Dict[str, np.ndarray], np.ndarray, BaseObs, BaseAct]):
"""Compute the local deformation at an arbitrary set of flexibility points
that are presumably responsible for most of the whole deformation of the
mechanical structure.
Expand Down Expand Up @@ -481,13 +483,16 @@ class DeformationEstimator(
having known orientation. The other ones will be set to identity. For
a legged robot, this corresponds to one of the contact bodies, usually
the one holding most of the total weight.

.. warning::
(2) and (3) are not supported for now, as it requires using one
additional observation layer responsible for estimating the theoretical
configuration of the robot including its freeflyer, along with the name
of the reference frame, ie the one having known orientation.

.. note::
The feature space of this observer is the same as `MahonyFilter`. See
documentation for details.

.. seealso::
Matthieu Vigne, Antonio El Khoury, Marine Pétriaux, Florent Di Meglio,
and Nicolas Petit "MOVIE: a Velocity-aided IMU Attitude Estimator for
Expand All @@ -503,8 +508,9 @@ def __init__(self,
imu_frame_names: Sequence[str],
flex_frame_names: Sequence[str],
ignore_twist: bool = True,
nested_imu_key: Sequence[str] = ("features", "mahony_filter"),
# root_frame: str = "root_joint",
nested_imu_key: Sequence[str] = (
"features", "mahony_filter", "quat"),
compute_rpy: bool = True,
update_ratio: int = 1) -> None:
"""
.. warning::
Expand Down Expand Up @@ -534,6 +540,10 @@ def __init__(self,
to the IMU quaternion estimates. Their ordering
must be consistent with the true IMU sensors of
the robot.
:param compute_rpy: Whether to compute the Yaw-Pitch-Roll Euler angles
representation for the 3D orientation of the IMU,
in addition to the quaternion representation.
Optional: False by default.
:param update_ratio: Ratio between the update period of the observer
and the one of the subsequent observer. -1 to
match the simulation timestep of the environment.
Expand All @@ -549,6 +559,7 @@ def __init__(self,
list, (imu_frame_names, flex_frame_names))

# Backup some of the user-argument(s)
self.compute_rpy = compute_rpy
self.ignore_twist = ignore_twist

# Create flexible dynamic model.
Expand Down Expand Up @@ -687,14 +698,21 @@ def __init__(self,
# Initialize the observer
super().__init__(name, env, update_ratio)

# Define some proxies for fast access
self._quat = self.observation["quat"]
if self.compute_rpy:
self._rpy = self.observation["rpy"]
else:
self._rpy = np.array([])

# Define chunks associated with each independent flexibility-imu chain
self._deformation_flex_quats, self._obs_imu_indices = [], []
flex_start_index = 0
for flex_frame_names_, imu_frame_names_, _ in (
self.flex_imu_frame_names_chains):
flex_last_index = flex_start_index + len(flex_frame_names_)
self._deformation_flex_quats.append(
self.observation[:, flex_start_index:flex_last_index])
self._quat[:, flex_start_index:flex_last_index])
flex_start_index = flex_last_index
imu_frame_names_filtered = tuple(filter(None, imu_frame_names_))
imu_indices = tuple(
Expand All @@ -705,10 +723,18 @@ def _initialize_observation_space(self) -> None:
nflex = sum(
len(flex_frame_names)
for flex_frame_names, _, _ in self.flex_imu_frame_names_chains)
self.observation_space = gym.spaces.Box(
observation_space: Dict[str, gym.Space] = OrderedDict()
observation_space["quat"] = gym.spaces.Box(
low=np.full((4, nflex), -1.0 - 1e-9),
high=np.full((4, nflex), 1.0 + 1e-9),
dtype=np.float64)
if self.compute_rpy:
high = np.array([np.pi, np.pi/2, np.pi]) + 1e-9
observation_space["rpy"] = gym.spaces.Box(
low=-high[:, np.newaxis].repeat(nflex, axis=1),
high=high[:, np.newaxis].repeat(nflex, axis=1),
dtype=np.float64)
self.observation_space = gym.spaces.Dict(observation_space)

def _setup(self) -> None:
# Call base implementation
Expand All @@ -725,7 +751,7 @@ def _setup(self) -> None:
self.pinocchio_data_th = self.pinocchio_model_th.createData()

# Fix initialization of the observation to be valid quaternions
self.observation[-1] = 1.0
self._quat[-1] = 1.0

# Refresh flexibility and IMU frame orientation proxies
self._kin_flex_rots.clear()
Expand Down Expand Up @@ -762,9 +788,16 @@ def _setup(self) -> None:
self._is_compiled = True

@property
def fieldnames(self) -> List[List[str]]:
return [[f"{name}.Quat{e}" for name in self.flexibility_frame_names]
for e in ("x", "y", "z", "w")]
def fieldnames(self) -> Dict[str, List[List[str]]]:
fieldnames: Dict[str, List[List[str]]] = {}
fieldnames["quat"] = [
[f"{name}.Quat{e}" for name in self.flexibility_frame_names]
for e in ("x", "y", "z", "w")]
if self.compute_rpy:
fieldnames["rpy"] = [
[".".join((name, e)) for name in self.flexibility_frame_names]
for e in ("Roll", "Pitch", "Yaw")]
return fieldnames

def refresh_observation(self, measurement: BaseObs) -> None:
# Translate encoder data at joint level
Expand Down Expand Up @@ -795,3 +828,7 @@ def refresh_observation(self, measurement: BaseObs) -> None:
self._deformation_flex_quats):
flexibility_estimator(
self._obs_imu_quats, *args, self.ignore_twist)

# Compute the RPY representation if requested
if self.compute_rpy:
quat_to_rpy(self._quat, self._rpy)
83 changes: 67 additions & 16 deletions python/gym_jiminy/common/gym_jiminy/common/blocks/mahony_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from ..bases import BaseObs, BaseAct, BaseObserverBlock, InterfaceJiminyEnv
from ..utils import (fill,
quat_to_rpy,
matrices_to_quat,
swing_from_vector,
compute_tilt_from_quat,
Expand Down Expand Up @@ -144,18 +145,33 @@ def update_twist(q: np.ndarray,
)


class MahonyFilter(
BaseObserverBlock[np.ndarray, Dict[str, np.ndarray], BaseObs, BaseAct]
):
class MahonyFilter(BaseObserverBlock[
Dict[str, np.ndarray], Dict[str, np.ndarray], BaseObs, BaseAct]):
"""Mahony's Nonlinear Complementary Filter on SO(3).

This observer estimate the 3D orientation of all the IMU of the robot from
the Gyrometer and Accelerometer measurements, i.e. the angular velocity in
local frame and the linear acceleration minus gravity in local frame.

.. seealso::
Robert Mahony, Tarek Hamel, and Jean-Michel Pflimlin "Nonlinear
Complementary Filters on the Special Orthogonal Group" IEEE
Transactions on Automatic Control, Institute of Electrical and
Electronics Engineers, 2008, 53 (5), pp.1203-1217:
https://hal.archives-ouvertes.fr/hal-00488376/document

.. note::
The feature space of this observer is a dictionary storing quaternion
estimates under key 'quat', and optionally, their corresponding
Yaw-Pitch-Roll Euler angles representations under key 'key' if
`compute_rpy=True`. Both leaves are 2D array of shape (N, M), where
N is the number of components of the representation while M is the
number of IMUs of the robot. More specifically, `N=4` for quaternions
(Qx, Qy, Qz, Qw) and 'N=3' for Euler angles (Roll-Pitch-Yaw). Indeed,
the Yaw angle of the Yaw-Pitch-Roll Euler angles representations is
systematically included in the feature space, even if its value is
meaningless, i.e. `ignore_twist=True`.

.. warning::
This filter works best for 'observe_dt' smaller or equal to 5ms. Its
performance drops rapidly beyond this point. Having 'observe_dt' equal
Expand All @@ -170,6 +186,7 @@ def __init__(self,
exact_init: bool = True,
kp: Union[np.ndarray, float] = 1.0,
ki: Union[np.ndarray, float] = 0.1,
compute_rpy: bool = True,
update_ratio: int = 1) -> None:
"""
:param name: Name of the block.
Expand All @@ -196,6 +213,10 @@ def __init__(self,
Optional: `1.0` by default.
:param mahony_ki: Integral gain used for gyro bias estimate.
Optional: `0.1` by default.
:param compute_rpy: Whether to compute the Yaw-Pitch-Roll Euler angles
representation for the 3D orientation of the IMU,
in addition to the quaternion representation.
Optional: False by default.
:param update_ratio: Ratio between the update period of the observer
and the one of the subsequent observer. -1 to
match the simulation timestep of the environment.
Expand All @@ -209,6 +230,7 @@ def __init__(self,
ki = np.full((num_imu_sensors,), ki)

# Backup some of the user arguments
self.compute_rpy = compute_rpy
self.exact_init = exact_init
self.kp = kp
self.ki = ki
Expand Down Expand Up @@ -263,6 +285,13 @@ def __init__(self,
# Initialize the observer
super().__init__(name, env, update_ratio)

# Define some proxies for fast access
self._quat = self.observation["quat"]
if self.compute_rpy:
self._rpy = self.observation["rpy"]
else:
self._rpy = np.array([])

def _initialize_state_space(self) -> None:
"""Configure the internal state space of the observer.

Expand Down Expand Up @@ -291,22 +320,34 @@ def _initialize_state_space(self) -> None:
def _initialize_observation_space(self) -> None:
"""Configure the observation space of the observer.

.. note::
if `compute_rpy` is enabled, then the Roll, Pitch and Yaw angles
are guaranteed to be within range [-pi,pi], [-pi/2,pi/2], [-pi,pi].

It corresponds to the current orientation estimate for all the IMUs of
the robot at once, with special treatment for their twist part. See
`__init__` documentation for details.
"""
observation_space: Dict[str, gym.Space] = OrderedDict()
num_imu_sensors = len(self.env.robot.sensors[ImuSensor.type])
self.observation_space = gym.spaces.Box(
observation_space["quat"] = gym.spaces.Box(
low=np.full((4, num_imu_sensors), -1.0 - 1e-9),
high=np.full((4, num_imu_sensors), 1.0 + 1e-9),
dtype=np.float64)
if self.compute_rpy:
high = np.array([np.pi, np.pi/2, np.pi]) + 1e-9
observation_space["rpy"] = gym.spaces.Box(
low=-high[:, np.newaxis].repeat(num_imu_sensors, axis=1),
high=high[:, np.newaxis].repeat(num_imu_sensors, axis=1),
dtype=np.float64)
self.observation_space = gym.spaces.Dict(observation_space)

def _setup(self) -> None:
# Call base implementation
super()._setup()

# Fix initialization of the observation to be valid quaternions
self.observation[-1] = 1.0
self._quat[-1] = 1.0

# Make sure observe update is discrete-time
if self.env.observe_dt <= 0.0:
Expand All @@ -317,7 +358,7 @@ def _setup(self) -> None:
self.gyro, self.acc = np.split(
self.env.sensor_measurements[ImuSensor.type], 2)

# Reset the sensor bias
# Reset the sensor bias estimate
fill(self._bias, 0)

# Reset the twist estimate
Expand Down Expand Up @@ -345,10 +386,17 @@ def get_state(self) -> Dict[str, np.ndarray]:
return self._state

@property
def fieldnames(self) -> List[List[str]]:
def fieldnames(self) -> Dict[str, List[List[str]]]:
imu_sensors = self.env.robot.sensors[ImuSensor.type]
return [[f"{sensor.name}.Quat{e}" for sensor in imu_sensors]
for e in ("x", "y", "z", "w")]
fieldnames: Dict[str, List[List[str]]] = {}
fieldnames["quat"] = [
[f"{sensor.name}.Quat{e}" for sensor in imu_sensors]
for e in ("x", "y", "z", "w")]
if self.compute_rpy:
fieldnames["rpy"] = [
[".".join((sensor.name, e)) for sensor in imu_sensors]
for e in ("Roll", "Pitch", "Yaw")]
return fieldnames

def refresh_observation(self, measurement: BaseObs) -> None:
# Re-initialize the quaternion estimate if no simulation running.
Expand All @@ -365,7 +413,7 @@ def refresh_observation(self, measurement: BaseObs) -> None:
# not very accurate because the initial acceleration is
# often jerky, plus the tilt is not observable at all.
acc = self.acc / np.linalg.norm(self.acc, axis=0)
swing_from_vector(acc, self.observation)
swing_from_vector(acc, self._quat)

self._is_initialized = True
if not self._is_initialized:
Expand All @@ -378,18 +426,17 @@ def refresh_observation(self, measurement: BaseObs) -> None:
imu_rots.append(rot)

# Convert it to quaternions
matrices_to_quat(tuple(imu_rots), self.observation)
matrices_to_quat(tuple(imu_rots), self._quat)

# Keep track of tilt if necessary
if self._update_twist:
self._twist[:] = np.arctan2(
self.observation[2], self.observation[3])
self._twist[:] = np.arctan2(self._quat[2], self._quat[3])

self._is_initialized = True
return

# Run an iteration of the filter, computing the next state estimate
mahony_filter(self.observation,
mahony_filter(self._quat,
self._omega,
self.gyro,
self.acc,
Expand All @@ -400,11 +447,15 @@ def refresh_observation(self, measurement: BaseObs) -> None:

# Remove twist if requested
if self._remove_twist:
remove_twist_from_quat(self.observation)
remove_twist_from_quat(self._quat)

if self._update_twist:
update_twist(self.observation,
update_twist(self._quat,
self._twist,
self._omega,
self.twist_time_constant_inv,
self.observe_dt)

# Compute the RPY representation if requested
if self.compute_rpy:
quat_to_rpy(self._quat, self._rpy)
Loading