diff --git a/docs/source/fun_figs.py b/docs/source/fun_figs.py index 46a311d7..55099f02 100644 --- a/docs/source/fun_figs.py +++ b/docs/source/fun_figs.py @@ -6,8 +6,9 @@ sns.set_style("whitegrid") + # %% Banana distribution plot -def banana_plot(ax = None): +def banana_plot(ax=None): N = 500 x0 = nav.lib.SE2State([0.3, 3, 4], direction="right") covariance = np.diag([0.2**2, 0.05**2, 0.05**2]) @@ -38,15 +39,22 @@ def banana_plot(ax = None): # random greyscale color color = np.random.uniform(0.3, 0.9) - ax.plot(traj_pos[:, 0], traj_pos[:, 1], color=(color, color, color),zorder=1) + ax.plot( + traj_pos[:, 0], + traj_pos[:, 1], + color=(color, color, color), + zorder=1, + ) # save the final state final_states.append(x_traj[-1]) - + final_positions = np.array([x.position for x in final_states]) - ax.scatter(final_positions[:, 0], final_positions[:, 1], color="C0", zorder=2) + ax.scatter( + final_positions[:, 0], final_positions[:, 1], color="C0", zorder=2 + ) - # Propagate the mean with EKF + # Propagate the mean with EKF kf = nav.ExtendedKalmanFilter(process_model) x0_hat = nav.StateWithCovariance(x0, covariance) @@ -59,10 +67,12 @@ def banana_plot(ax = None): ax.plot(mean_traj[:, 0], mean_traj[:, 1], color="r", zorder=3, linewidth=3) ax.set_aspect("equal") + # banana_plot() + # %% -def pose3d_plot(ax = None): +def pose3d_plot(ax=None): N = 500 x0 = nav.lib.SE3State([0.3, 3, 4, 0, 0, 0], direction="right") process_model = nav.lib.BodyFrameVelocity(np.zeros(6)) @@ -78,13 +88,14 @@ def pose3d_plot(ax = None): x = process_model.evaluate(x, u, dt) x_traj.append(x.copy()) - fig, ax = nav.plot_poses(x_traj, ax = ax) + fig, ax = nav.plot_poses(x_traj, ax=ax) # pose3d_plot() + # %% -def three_sigma_plot(axs = None): +def three_sigma_plot(axs=None): dataset = nav.lib.datasets.SimulatedPoseRangingDataset() estimates = nav.run_filter( @@ -92,10 +103,12 @@ def three_sigma_plot(axs = None): dataset.get_ground_truth()[0], np.diag([0.1**2, 0.1**2, 0.1**2, 0.1**2, 0.1**2, 0.1**2]), dataset.get_input_data(), - dataset.get_measurement_data() - ) - - results = nav.GaussianResultList.from_estimates(estimates, dataset.get_ground_truth()) + dataset.get_measurement_data(), + ) + + results = nav.GaussianResultList.from_estimates( + estimates, dataset.get_ground_truth() + ) fig, axs = nav.plot_error(results[:, :3], axs=axs) axs[2].set_xlabel("Time (s)") @@ -105,7 +118,6 @@ def three_sigma_plot(axs = None): if __name__ == "__main__": - # Make one large figure which has all the plots. This will be a 1x3 grid, with the # last plot itself being a three vertically stacked plots. @@ -119,13 +131,11 @@ def three_sigma_plot(axs = None): # which will be used here: - - fig = plt.figure(figsize=(20, 6)) gs = fig.add_gridspec(1, 3, width_ratios=[1, 1, 1]) ax1 = fig.add_subplot(gs[0]) - ax2 = fig.add_subplot(gs[1], projection='3d') - + ax2 = fig.add_subplot(gs[1], projection="3d") + # The last plot is a 3x1 grid gs2 = gs[2].subgridspec(3, 1, hspace=0.1) ax3 = fig.add_subplot(gs2[0]) @@ -141,25 +151,27 @@ def three_sigma_plot(axs = None): ax2.set_yticklabels([]) ax2.set_zticklabels([]) - banana_plot(ax1) pose3d_plot(ax2) three_sigma_plot(np.array([ax3, ax4, ax5])) - # Set spacing to the above values + # Set spacing to the above values fig.subplots_adjust( top=0.975, bottom=0.097, left=0.025, right=0.992, hspace=0.2, - wspace=0.117 + wspace=0.117, ) - # Save the figure with transparent background, next to this file - import os - fig.savefig(os.path.join(os.path.dirname(__file__), "fun_figs.png"), transparent=True) + import os + + fig.savefig( + os.path.join(os.path.dirname(__file__), "fun_figs.png"), + transparent=True, + ) plt.show() # %% diff --git a/navlie/__init__.py b/navlie/__init__.py index 9cd28a55..45e8085d 100644 --- a/navlie/__init__.py +++ b/navlie/__init__.py @@ -40,5 +40,11 @@ jacobian, ) +from .composite import ( + CompositeState, + CompositeProcessModel, + CompositeMeasurementModel, + CompositeInput, +) -from .lib.states import StampedValue # for backwards compatibility +from .lib.states import StampedValue # for backwards compatibility diff --git a/navlie/composite.py b/navlie/composite.py new file mode 100644 index 00000000..e96c4b47 --- /dev/null +++ b/navlie/composite.py @@ -0,0 +1,573 @@ +from typing import Any, List +import numpy as np +from navlie import State, Measurement, MeasurementModel, ProcessModel, Input +from scipy.linalg import block_diag + + +class CompositeState(State): + """ + A "composite" state object intended to hold a list of State objects as a + single conceptual "state". The intended use is to hold a list of states + as a single state at a specific time, of potentially different types, + and this class will take care of defining the appropriate operations on + the composite state such as the ``plus`` and ``minus`` methods, as well + as the ``plus_jacobian`` and ``minus_jacobian`` methods. + + Each state in the provided list has an index (the index in the list), as + well as a state_id, which is found as an attribute in the corresponding State + object. + + It is possible to access sub-states in the composite states both by index + and by ID. + """ + + def __init__( + self, state_list: List[State], stamp: float = None, state_id=None + ): + """ + Parameters + ---------- + state_list: List[State] + List of State that forms this composite state + stamp: float, optional + Timestamp of the composite state. This can technically be different + from the timestamps of the substates. + state_id: Any, optional + State ID of the composite state. This can be different from the + state IDs of the substates. + """ + if isinstance(state_list, tuple): + state_list = list(state_list) + + #:List[State]: The substates are the CompositeState's value. + self.value = state_list + + self.stamp = stamp + self.state_id = state_id + + def __getstate__(self): + """ + Get the state of the object for pickling. + """ + # When using __slots__ the pickle module expects a tuple from __getstate__. + # See https://stackoverflow.com/questions/1939058/simple-example-of-use-of-setstate-and-getstate/41754104#41754104 + return ( + None, + { + "value": self.value, + "stamp": self.stamp, + "state_id": self.state_id, + }, + ) + + def __setstate__(self, attributes): + """ + Set the state of the object for unpickling. + """ + # When using __slots__ the pickle module sends a tuple for __setstate__. + # See https://stackoverflow.com/questions/1939058/simple-example-of-use-of-setstate-and-getstate/41754104#41754104 + + attributes = attributes[1] + self.value = attributes["value"] + self.stamp = attributes["stamp"] + self.state_id = attributes["state_id"] + + @property + def dof(self): + return sum([x.dof for x in self.value]) + + def get_index_by_id(self, state_id): + """ + Get index of a particular state_id in the list of states. + """ + return [x.state_id for x in self.value].index(state_id) + + def get_slices(self) -> List[slice]: + """ + Get slices for each state in the list of states. + """ + slices = [] + counter = 0 + for state in self.value: + slices.append(slice(counter, counter + state.dof)) + counter += state.dof + + return slices + + def add_state(self, state: State, stamp: float = None, state_id=None): + """Adds a state and it's corresponding slice to the composite state.""" + self.value.append(state) + + def remove_state_by_id(self, state_id): + """Removes a given state by ID.""" + idx = self.get_index_by_id(state_id) + self.value.pop(idx) + + def get_slice_by_id(self, state_id, slices=None): + """ + Get slice of a particular state_id in the list of states. + """ + + if slices is None: + slices = self.get_slices() + + idx = self.get_index_by_id(state_id) + return slices[idx] + + def get_matrix_block_by_ids( + self, mat: np.ndarray, state_id_1: Any, state_id_2: Any = None + ) -> np.ndarray: + """Gets the portion of a matrix corresponding to two states. + + This function is useful when extract specific blocks of a covariance + matrix, for example. + + Parameters + ---------- + mat : np.ndarray + N x N matrix + state_id_1 : Any + State ID of state 1. + state_id_2 : Any, optional + State ID of state 2. If None, state_id_2 is set to state_id_1. + + Returns + ------- + np.ndarray + Subblock of mat corrsponding to + slices of state_id_1 and state_id_2. + """ + + if state_id_2 is None: + state_id_2 = state_id_1 + + slice_1 = self.get_slice_by_id(state_id_1) + slice_2 = self.get_slice_by_id(state_id_2) + + return mat[slice_1, slice_2] + + def set_matrix_block_by_ids( + self, + new_mat_block: np.ndarray, + mat: np.ndarray, + state_id_1: Any, + state_id_2: Any = None, + ) -> np.ndarray: + """Sets the portion of the covariance block corresponding to two states. + + Parameters + ---------- + new_mat_block : np.ndarray + A subblock to be entered into mat. + mat : np.ndarray + Full matrix. + state_id_1 : Any + State ID of state 1. + state_id_2 : Any, optional + State ID of state 2. If None, state_id_2 is set to state_id_1. + + Returns + ------- + np.ndarray + mat with updated subblock. + """ + + if state_id_2 is None: + state_id_2 = state_id_1 + + slice_1 = self.get_slice_by_id(state_id_1) + slice_2 = self.get_slice_by_id(state_id_2) + + mat[slice_1, slice_2] = new_mat_block + return mat + + def get_value_by_id(self, state_id) -> Any: + """ + Get state value by id. + """ + idx = self.get_index_by_id(state_id) + return self.value[idx].value + + def get_state_by_id(self, state_id) -> State: + """ + Get state object by id. + """ + idx = self.get_index_by_id(state_id) + return self.value[idx] + + def get_dof_by_id(self, state_id) -> int: + """ + Get degrees of freedom of sub-state by id. + """ + idx = self.get_index_by_id(state_id) + return self.value[idx].dof + + def get_stamp_by_id(self, state_id) -> float: + """ + Get timestamp of sub-state by id. + """ + idx = self.get_index_by_id(state_id) + return self.value[idx].stamp + + def set_stamp_by_id(self, stamp: float, state_id): + """ + Set the timestamp of a sub-state by id. + """ + idx = self.get_index_by_id(state_id) + self.value[idx].stamp = stamp + + def set_state_by_id(self, state: State, state_id): + """ + Set the whole sub-state by id. + """ + idx = self.get_index_by_id(state_id) + self.value[idx] = state + + def set_value_by_id(self, value: Any, state_id: Any): + """ + Set the value of a sub-state by id. + """ + idx = self.get_index_by_id(state_id) + self.value[idx].value = value + + def set_stamp_for_all(self, stamp: float): + """ + Set the timestamp of all substates. + """ + for state in self.value: + state.stamp = stamp + + def to_list(self): + """ + Converts the CompositeState object back into a list of states. + """ + return self.value + + def copy(self) -> "CompositeState": + """ + Returns a new composite state object where the state values have also + been copied. + """ + return self.__class__( + [state.copy() for state in self.value], self.stamp, self.state_id + ) + + def plus(self, dx, new_stamp: float = None) -> "CompositeState": + """ + Updates the value of each sub-state given a dx. Interally parses + the dx vector. + """ + new = self.copy() + for i, state in enumerate(new.value): + new.value[i] = state.plus(dx[: state.dof]) + dx = dx[state.dof :] + + if new_stamp is not None: + new.set_stamp_for_all(new_stamp) + + return new + + def minus(self, x: "CompositeState") -> np.ndarray: + dx = [] + for i, v in enumerate(x.value): + dx.append( + self.value[i].minus(x.value[i]).reshape((self.value[i].dof,)) + ) + + return np.concatenate(dx).reshape((-1, 1)) + + def plus_by_id( + self, dx, state_id: int, new_stamp: float = None + ) -> "CompositeState": + """ + Updates a specific sub-state. + """ + new = self.copy() + idx = new.get_index_by_id(state_id) + new.value[idx].plus(dx) + if new_stamp is not None: + new.set_stamp_by_id(new_stamp, state_id) + + return new + + def jacobian_from_blocks(self, block_dict: dict): + """ + Returns the jacobian of the entire composite state given jacobians + associated with some of the substates. These are provided as a dictionary + with the the keys being the substate IDs. + """ + block: np.ndarray = list(block_dict.values())[0] + m = block.shape[0] # Dimension of "y" value + jac = np.zeros((m, self.dof)) + slices = self.get_slices() + for state_id, block in block_dict.items(): + slc = self.get_slice_by_id(state_id, slices) + jac[:, slc] = block + + return jac + + def plus_jacobian(self, dx: np.ndarray) -> np.ndarray: + dof = self.dof + jac = np.zeros((dof, dof)) + counter = 0 + for state in self.value: + jac[ + counter : counter + state.dof, + counter : counter + state.dof, + ] = state.plus_jacobian(dx[: state.dof]) + dx = dx[state.dof :] + counter += state.dof + + return jac + + def minus_jacobian(self, x: "CompositeState") -> np.ndarray: + dof = self.dof + jac = np.zeros((dof, dof)) + counter = 0 + for i, state in enumerate(self.value): + jac[ + counter : counter + state.dof, + counter : counter + state.dof, + ] = state.minus_jacobian(x.value[i]) + counter += state.dof + + return jac + + def __repr__(self): + substate_line_list = [] + for v in self.value: + substate_line_list.extend(v.__repr__().split("\n")) + substates_str = "\n".join([" " + s for s in substate_line_list]) + s = [ + f"{self.__class__.__name__}(stamp={self.stamp}, state_id={self.state_id}) with substates:", + substates_str, + ] + return "\n".join(s) + + +class CompositeInput(Input): + """ + + """ + + # TODO: add tests to new methods + def __init__(self, input_list: List[Input]) -> None: + self.input_list = input_list + + @property + def dof(self) -> int: + return sum([input.dof for input in self.input_list]) + + @property + def stamp(self) -> float: + return self.input_list[0].stamp + + def get_index_by_id(self, state_id): + """ + Get index of a particular state_id in the list of inputs. + """ + return [x.state_id for x in self.input_list].index(state_id) + + def add_input(self, input: Input): + """ + Adds an input to the composite input. + """ + self.input_list.append(input) + + def remove_input_by_id(self, state_id): + """ + Removes a given input by ID. + """ + idx = self.get_index_by_id(state_id) + self.input_list.pop(idx) + + def get_input_by_id(self, state_id) -> Input: + """ + Get input object by id. + """ + idx = self.get_index_by_id(state_id) + return self.input_list[idx] + + def get_dof_by_id(self, state_id) -> int: + """ + Get degrees of freedom of sub-input by id. + """ + idx = self.get_index_by_id(state_id) + return self.input_list[idx].dof + + def get_stamp_by_id(self, state_id) -> float: + """ + Get timestamp of sub-input by id. + """ + idx = self.get_index_by_id(state_id) + return self.input_list[idx].stamp + + def set_stamp_by_id(self, stamp: float, state_id): + """ + Set the timestamp of a sub-input by id. + """ + idx = self.get_index_by_id(state_id) + self.input_list[idx].stamp = stamp + + def set_input_by_id(self, input: Input, state_id): + """ + Set the whole sub-input by id. + """ + idx = self.get_index_by_id(state_id) + self.input_list[idx] = input + + def set_stamp_for_all(self, stamp: float): + """ + Set the timestamp of all subinputs. + """ + for input in self.input_list: + input.stamp = stamp + + def to_list(self): + """ + Converts the CompositeInput object back into a list of inputs. + """ + return self.input_list + + def copy(self) -> "CompositeInput": + return CompositeInput([input.copy() for input in self.input_list]) + + def plus(self, w: np.ndarray): + new = self.copy() + temp = w + for i, input in enumerate(self.input_list): + new.input_list[i] = input.plus(temp[: input.dof]) + temp = temp[input.dof :] + + return new + + +class CompositeProcessModel(ProcessModel): + """ + + Should this be called a StackedProcessModel? + # TODO: Add documentation and tests + """ + + # TODO: This needs to be expanded and/or changed. We have come across the + # following use cases: + # 1. Applying a completely seperate process model to each sub-state. + # 2. Applying a single process model to each sub-state (seperately). + # 3. Applying a single process model to one sub-state, and leaving the rest + # unchanged. + # 4. Applying process model A to some sub-states, and process model B to + # other sub-states + # 5. Receiving a CompositeInput, which is a list of synchronously-received + # inputs, and applying each input to the corresponding sub-state. + # 6. Receiving the state-specific input asynchronously, applying to the + # corresponding sub-state, and leaving the rest unchanged. Typically happens + # with case 3. + + # What they all have in common: list of decoupled process models, one per + # substate. For coupled process models, the user will have to define their + # own process model from scratch. + + def __init__( + self, + model_list: List[ProcessModel], + shared_input: bool = False, + ): + self._model_list = model_list + self._shared_input = shared_input + + def evaluate( + self, + x: CompositeState, + u: CompositeInput, + dt: float, + ) -> CompositeState: + x = x.copy() + for i, x_sub in enumerate(x.value): + if self._shared_input: + u_sub = u + else: + u_sub = u.input_list[i] + x.value[i] = self._model_list[i].evaluate(x_sub, u_sub, dt) + + return x + + def jacobian( + self, + x: CompositeState, + u: CompositeInput, + dt: float, + ) -> np.ndarray: + jac = [] + for i, x_sub in enumerate(x.value): + if self._shared_input: + u_sub = u + else: + u_sub = u.input_list[i] + jac.append(self._model_list[i].jacobian(x_sub, u_sub, dt)) + + return block_diag(*jac) + + def covariance( + self, + x: CompositeState, + u: CompositeInput, + dt: float, + ) -> np.ndarray: + cov = [] + for i, x_sub in enumerate(x.value): + if self._shared_input: + u_sub = u + else: + u_sub = u.input_list[i] + cov.append(self._model_list[i].covariance(x_sub, u_sub, dt)) + + return block_diag(*cov) + + +class CompositeMeasurementModel(MeasurementModel): + """ + Wrapper for a standard measurement model that assigns the model to a specific + substate (referenced by `state_id`) inside a CompositeState. + """ + + def __init__(self, model: MeasurementModel, state_id): + self.model = model + self.state_id = state_id + + def __repr__(self): + return f"{self.model}(of substate {self.state_id})" + + def evaluate(self, x: CompositeState) -> np.ndarray: + return self.model.evaluate(x.get_state_by_id(self.state_id)) + + def jacobian(self, x: CompositeState) -> np.ndarray: + x_sub = x.get_state_by_id(self.state_id) + jac_sub = self.model.jacobian(x_sub) + jac = np.zeros((jac_sub.shape[0], x.dof)) + slc = x.get_slice_by_id(self.state_id) + jac[:, slc] = jac_sub + return jac + + def covariance(self, x: CompositeState) -> np.ndarray: + x_sub = x.get_state_by_id(self.state_id) + return self.model.covariance(x_sub) + + +class CompositeMeasurement(Measurement): + def __init__(self, y: Measurement, state_id: Any): + """ + Converts a standard Measurement into a CompositeMeasurement, which + replaces the model with a CompositeMeasurementModel. + + Parameters + ---------- + y : Measurement + Measurement to be converted. + state_id : Any + ID of the state that the measurement will be assigned to, + as per the CompositeMeasurementModel. + """ + model = CompositeMeasurementModel(y.model, state_id) + super().__init__( + value=y.value, stamp=y.stamp, model=model, state_id=y.state_id + ) diff --git a/navlie/datagen.py b/navlie/datagen.py index da3eade9..58369a9c 100644 --- a/navlie/datagen.py +++ b/navlie/datagen.py @@ -51,9 +51,11 @@ def __init__( input_func: Callable[[float], np.ndarray], input_covariance: np.ndarray, input_freq: float, - meas_model_list: List[MeasurementModel] = [], # TODO: fix mutable default argument + meas_model_list: List[ + MeasurementModel + ] = None, meas_freq_list: Union[float, List[float]] = None, - meas_offset_list: Union[float, List[float]] = [], + meas_offset_list: Union[float, List[float]] = None, ): # Make input covariance a callable if it isnt if callable(input_covariance): @@ -63,6 +65,12 @@ def __init__( else: raise ValueError("Input covariance must be a function or a matrix.") + if meas_model_list is None: + meas_model_list = [] + + if meas_offset_list is None: + meas_offset_list = [] + if isinstance(meas_model_list, MeasurementModel): meas_model_list = [meas_model_list] @@ -111,7 +119,8 @@ def __init__( def add_measurement_model( self, model: MeasurementModel, freq: float, offset: float = 0.0 ): - """Adds a new measurement model to the existing list of measurement models. + """ + Adds a new measurement model to the existing list of measurement models. Parameters ---------- diff --git a/navlie/filters.py b/navlie/filters.py index 2158aa1f..4f72ee0c 100644 --- a/navlie/filters.py +++ b/navlie/filters.py @@ -130,9 +130,7 @@ def predict( # usually only happens on estimator start-up if x.state.stamp is None: if u.stamp is None: - raise ValueError( - "Either state or input must have a time stamp" - ) + raise ValueError("Either state or input must have a time stamp") t_km1 = u.stamp else: t_km1 = x.state.stamp diff --git a/navlie/lib/__init__.py b/navlie/lib/__init__.py index dbffb169..a37a31d0 100644 --- a/navlie/lib/__init__.py +++ b/navlie/lib/__init__.py @@ -36,6 +36,9 @@ GlobalPosition, InvariantMeasurement, PointRelativePosition, + AbsoluteVelocity, + AbsolutePosition, + PointRelativePositionSLAM, ) from .preintegration import ( diff --git a/navlie/lib/imu.py b/navlie/lib/imu.py index c780d4de..f2b88e1b 100644 --- a/navlie/lib/imu.py +++ b/navlie/lib/imu.py @@ -12,7 +12,8 @@ import numpy as np from navlie.types import ProcessModel, Input from typing import Any, List -from navlie.lib.states import CompositeState, VectorState, SE23State +from navlie.lib.states import VectorState, SE23State +from navlie.composite import CompositeState from math import factorial @@ -31,7 +32,7 @@ def __init__( state_id: Any = None, covariance: np.ndarray = None, ): - """ + """ Parameters ---------- gyro : np.ndarray with size 3 @@ -47,7 +48,7 @@ def __init__( state_id : Any, optional State ID associated with the reading, by default None covariance : np.ndarray with size 12x12, optional - Covariance matrix describing the IMU noise, by default None. + Covariance matrix describing the IMU noise, by default None. """ super().__init__(dof=12, stamp=stamp, covariance=covariance) self.gyro = np.array(gyro).ravel() #:np.ndarray: Gyro reading @@ -137,10 +138,11 @@ def random(): class IMUState(CompositeState): """ The IMU state is a composite state that contains the navigation state - (attitude, velocity, position), and the gyro and accelerometer biases. The + (attitude, velocity, position), and the gyro and accelerometer biases. The navigation state is stored as an element of :math:`SE_2(3)`, and the biases are stored as vectors. """ + def __init__( self, nav_state: np.ndarray, @@ -279,7 +281,6 @@ def jacobian_from_blocks( for you. """ - for jac in [attitude, position, velocity, bias_gyro, bias_accel]: if jac is not None: dim = jac.shape[0] @@ -343,13 +344,14 @@ def M_matrix(phi_vec): """ The M matrix from the :download:`derivation document <../../../docs/source/_static/imu.pdf>`. """ - phi_mat = SO3.wedge(phi_vec) + phi_mat = SO3.wedge(phi_vec) M = np.sum( [ - (2 / factorial(n + 2)) * np.linalg.matrix_power(phi_mat, n) for n in - range(100) - ], axis=0, - ) + (2 / factorial(n + 2)) * np.linalg.matrix_power(phi_mat, n) + for n in range(100) + ], + axis=0, + ) return M diff --git a/navlie/lib/models.py b/navlie/lib/models.py index 9f09dbc7..fa38c341 100644 --- a/navlie/lib/models.py +++ b/navlie/lib/models.py @@ -2,18 +2,21 @@ Measurement, ProcessModel, MeasurementModel, - Input, ) from navlie.lib.states import ( - CompositeState, MatrixLieGroupState, VectorState, VectorInput, ) +from navlie.composite import CompositeState, CompositeMeasurementModel from pymlg import SO2, SO3 import numpy as np from typing import List, Any -from scipy.linalg import block_diag + +from navlie.composite import ( + CompositeInput, + CompositeProcessModel, +) # For backwards compatibility, since these was moved class SingleIntegrator(ProcessModel): @@ -106,7 +109,6 @@ def jacobian(self, x, u, dt) -> np.ndarray: return Ad def covariance(self, x, u, dt) -> np.ndarray: - Ld = self.input_jacobian(dt) return Ld @ self._Q @ Ld.T @@ -189,7 +191,6 @@ def jacobian(self, x, u, dt) -> np.ndarray: return A def covariance(self, x, u, dt) -> np.ndarray: - L = self.input_jacobian(dt) return L @ self._Q @ L.T @@ -228,7 +229,7 @@ class BodyFrameVelocity(ProcessModel): .. math:: \mathbf{T}_k = \mathbf{T}_{k-1} \exp(\Delta t \mathbf{u}_{k-1}^\wedge) - This is commonly the process model associated with SE(n). + This is commonly the process model associated with SE(n). This class is comptabile with ``SO2State, SO3State, SE2State, SE3State, SE23State``. """ @@ -265,7 +266,7 @@ def covariance( class RelativeBodyFrameVelocity(ProcessModel): """ - The relative body-frame velocity process model is of the form + The relative body-frame velocity process model is of the form .. math:: \mathbf{T}_k = \exp(\Delta t \mathbf{u}_{1,k-1}^\wedge) @@ -283,7 +284,7 @@ class RelativeBodyFrameVelocity(ProcessModel): """ def __init__(self, Q1: np.ndarray, Q2: np.ndarray): - """ + """ Parameters ---------- Q1 : np.ndarray @@ -363,6 +364,7 @@ class LinearMeasurement(MeasurementModel): This class is comptabile with ``VectorState``. """ + def __init__(self, C: np.ndarray, R: np.ndarray): """ Parameters @@ -385,238 +387,11 @@ def covariance(self, x: VectorState) -> np.ndarray: return self._R -class CompositeInput(Input): - """ - - """ - # TODO: add tests to new methods - def __init__(self, input_list: List[Input]) -> None: - self.input_list = input_list - - @property - def dof(self) -> int: - return sum([input.dof for input in self.input_list]) - - @property - def stamp(self) -> float: - return self.input_list[0].stamp - - def get_index_by_id(self, state_id): - """ - Get index of a particular state_id in the list of inputs. - """ - return [x.state_id for x in self.input_list].index(state_id) - - def add_input(self, input: Input): - """ - Adds an input to the composite input. - """ - self.input_list.append(input) - - def remove_input_by_id(self, state_id): - """ - Removes a given input by ID. - """ - idx = self.get_index_by_id(state_id) - self.input_list.pop(idx) - - def get_input_by_id(self, state_id) -> Input: - """ - Get input object by id. - """ - idx = self.get_index_by_id(state_id) - return self.input_list[idx] - - def get_dof_by_id(self, state_id) -> int: - """ - Get degrees of freedom of sub-input by id. - """ - idx = self.get_index_by_id(state_id) - return self.input_list[idx].dof - - def get_stamp_by_id(self, state_id) -> float: - """ - Get timestamp of sub-input by id. - """ - idx = self.get_index_by_id(state_id) - return self.input_list[idx].stamp - - def set_stamp_by_id(self, stamp: float, state_id): - """ - Set the timestamp of a sub-input by id. - """ - idx = self.get_index_by_id(state_id) - self.input_list[idx].stamp = stamp - - def set_input_by_id(self, input: Input, state_id): - """ - Set the whole sub-input by id. - """ - idx = self.get_index_by_id(state_id) - self.input_list[idx] = input - - def set_stamp_for_all(self, stamp: float): - """ - Set the timestamp of all subinputs. - """ - for input in self.input_list: - input.stamp = stamp - - def to_list(self): - """ - Converts the CompositeInput object back into a list of inputs. - """ - return self.input_list - - def copy(self) -> "CompositeInput": - return CompositeInput([input.copy() for input in self.input_list]) - - def plus(self, w: np.ndarray): - new = self.copy() - temp = w - for i, input in enumerate(self.input_list): - new.input_list[i] = input.plus(temp[: input.dof]) - temp = temp[input.dof :] - - return new - - -class CompositeProcessModel(ProcessModel): - """ - - Should this be called a StackedProcessModel? - # TODO: Add documentation and tests - """ - - # TODO: This needs to be expanded and/or changed. We have come across the - # following use cases: - # 1. Applying a completely seperate process model to each sub-state. - # 2. Applying a single process model to each sub-state (seperately). - # 3. Applying a single process model to one sub-state, and leaving the rest - # unchanged. - # 4. Applying process model A to some sub-states, and process model B to - # other sub-states - # 5. Receiving a CompositeInput, which is a list of synchronously-received - # inputs, and applying each input to the corresponding sub-state. - # 6. Receiving the state-specific input asynchronously, applying to the - # corresponding sub-state, and leaving the rest unchanged. Typically happens - # with case 3. - - # What they all have in common: list of decoupled process models, one per - # substate. For coupled process models, the user will have to define their - # own process model from scratch. - - def __init__( - self, - model_list: List[ProcessModel], - shared_input: bool = False, - ): - self._model_list = model_list - self._shared_input = shared_input - - def evaluate( - self, - x: CompositeState, - u: CompositeInput, - dt: float, - ) -> CompositeState: - x = x.copy() - for i, x_sub in enumerate(x.value): - if self._shared_input: - u_sub = u - else: - u_sub = u.input_list[i] - x.value[i] = self._model_list[i].evaluate(x_sub, u_sub, dt) - - return x - - def jacobian( - self, - x: CompositeState, - u: CompositeInput, - dt: float, - ) -> np.ndarray: - jac = [] - for i, x_sub in enumerate(x.value): - if self._shared_input: - u_sub = u - else: - u_sub = u.input_list[i] - jac.append(self._model_list[i].jacobian(x_sub, u_sub, dt)) - - return block_diag(*jac) - - def covariance( - self, - x: CompositeState, - u: CompositeInput, - dt: float, - ) -> np.ndarray: - cov = [] - for i, x_sub in enumerate(x.value): - if self._shared_input: - u_sub = u - else: - u_sub = u.input_list[i] - cov.append(self._model_list[i].covariance(x_sub, u_sub, dt)) - - return block_diag(*cov) - - -class CompositeMeasurementModel(MeasurementModel): - """ - Wrapper for a standard measurement model that assigns the model to a specific - substate (referenced by `state_id`) inside a CompositeState. - """ - - def __init__(self, model: MeasurementModel, state_id): - self.model = model - self.state_id = state_id - - def __repr__(self): - return f"{self.model}(of substate {self.state_id})" - - def evaluate(self, x: CompositeState) -> np.ndarray: - return self.model.evaluate(x.get_state_by_id(self.state_id)) - - def jacobian(self, x: CompositeState) -> np.ndarray: - x_sub = x.get_state_by_id(self.state_id) - jac_sub = self.model.jacobian(x_sub) - jac = np.zeros((jac_sub.shape[0], x.dof)) - slc = x.get_slice_by_id(self.state_id) - jac[:, slc] = jac_sub - return jac - - def covariance(self, x: CompositeState) -> np.ndarray: - x_sub = x.get_state_by_id(self.state_id) - return self.model.covariance(x_sub) - - -class CompositeMeasurement(Measurement): - def __init__(self, y: Measurement, state_id: Any): - """ - Converts a standard Measurement into a CompositeMeasurement, which - replaces the model with a CompositeMeasurementModel. - - Parameters - ---------- - y : Measurement - Measurement to be converted. - state_id : Any - ID of the state that the measurement will be assigned to, - as per the CompositeMeasurementModel. - """ - model = CompositeMeasurementModel(y.model, state_id) - super().__init__( - value=y.value, stamp=y.stamp, model=model, state_id=y.state_id - ) - - class RangePointToAnchor(MeasurementModel): """ Range measurement from a point state to an anchor (which is also another point). I.e., the state is a vector with the first ``dim`` elements being - the position of the point. This model is of the form + the position of the point. This model is of the form .. math:: y = ||\mathbf{r}_a - \mathbf{r}|| @@ -626,10 +401,10 @@ class RangePointToAnchor(MeasurementModel): """ def __init__(self, anchor_position: List[float], R: float): - """ + """ Parameters ---------- - + anchor_position : np.ndarray or List[float] Position of anchor. R : float @@ -920,13 +695,32 @@ class RangePoseToPose(MeasurementModel): """ Range model given two absolute poses of rigid bodies, each containing a ranging tag. - """ - # TODO. tag_body_positions should be optional. argh but this will be - # a breaking change since the argument order needs to be different. + Compatible with ``SE2State, SE3State, SE23State, IMUState``. + """ def __init__( self, tag_body_position1, tag_body_position2, state_id1, state_id2, R ): + """ + Parameters + ---------- + tag_body_position1 : np.ndarray + Position of tag in body frame of Robot 1. + tag_body_position2 : np.ndarray + Position of tag in body frame of Robot 2. + state_id1 : Any + State ID of Robot 1. + state_id2 : Any + State ID of Robot 2. + R : float or np.ndarray with size 1 + Covariance associated with range measurement error. + """ + # TODO. Make tag_body_position1 and tag_body_position2 optional, with a + # default value of either [0,0] or [0,0,0] (depending on the dimension + # of the passed state). Unfortunately, changing argument order is a + # breaking change. + + self.tag_body_position1 = np.array(tag_body_position1).flatten() self.tag_body_position2 = np.array(tag_body_position2).flatten() self.state_id1 = state_id1 @@ -1074,11 +868,14 @@ def covariance(self, x: MatrixLieGroupState) -> np.ndarray: else: return self.R + class GlobalPosition(AbsolutePosition): - """ + """ This class is deprecated. Use ``AbsolutePosition`` instead. """ - pass # alias + + pass # alias + class AbsoluteVelocity(MeasurementModel): """ diff --git a/navlie/lib/states.py b/navlie/lib/states.py index 57cf31b9..96e38b87 100644 --- a/navlie/lib/states.py +++ b/navlie/lib/states.py @@ -2,7 +2,7 @@ from pymlg.numpy.base import MatrixLieGroup import numpy as np from navlie.types import State, Input -from typing import Any, List +from typing import Any try: # We do not want to make ROS a hard dependency, so we import it only if @@ -15,6 +15,9 @@ raise +from navlie.composite import CompositeState # For backwards compatibility + + class VectorState(State): """ A standard vector-based state, with value represented by a 1D numpy array. @@ -211,15 +214,16 @@ def jacobian_from_blocks(self, **kwargs) -> np.ndarray: class SO2State(MatrixLieGroupState): - """ + """ A state object for rotations in 2D. The value of this state is stored as a 2x2 numpy array representing a direct element of the SO2 group. I.e., - + .. math:: - \mathbf{C} \in \mathbb{R}^{2 \times 2}, \quad + \mathbf{C} \in \mathbb{R}^{2 \times 2}, \quad \mathbf{C}^T \mathbf{C} = \mathbf{I}, \quad \det(\mathbf{C}) = \mathbf{1} - + """ + group = SO2 def __init__( @@ -250,16 +254,17 @@ def random(stamp: float = None, state_id=None, direction="right"): class SO3State(MatrixLieGroupState): - """ + """ A state object for rotations in 3D. The value of this state is stored as a 3x3 numpy array representing a direct element of the SO3 group. I.e., - + .. math:: - + \mathbf{C} \in \mathbb{R}^{3 \times 3}, \quad \mathbf{C}^T \mathbf{C} = \mathbf{I}, \quad \det(\mathbf{C}) = \mathbf{1} - + """ + group = SO3 def __init__( @@ -351,6 +356,7 @@ class SE2State(MatrixLieGroupState): """ + group = SE2 def __init__( @@ -422,7 +428,8 @@ class SE3State(MatrixLieGroupState): \end{bmatrix}, \quad \mathbf{C} \in SO(3), \quad \mathbf{r} \in \mathbb{R}^3. - """ + """ + group = SE3 def __init__( @@ -629,10 +636,11 @@ class SL3State(MatrixLieGroupState): the SL3 group. I.e., .. math:: - + \mathbf{C} \in SL(3), \quad \mathbf{C} \in \mathbb{R}^{3 \times 3}, \quad \det(\mathbf{C}) = \mathbf{1}. - """ + """ + group = SL3 def __init__( @@ -645,347 +653,6 @@ def __init__( super().__init__(value, self.group, stamp, state_id, direction) -class CompositeState(State): - """ - A "composite" state object intended to hold a list of State objects as a - single conceptual "state". The intended use is to hold a list of states - as a single state at a specific time, of potentially different types, - and this class will take care of defining the appropriate operations on - the composite state such as the ``plus`` and ``minus`` methods, as well - as the ``plus_jacobian`` and ``minus_jacobian`` methods. - - Each state in the provided list has an index (the index in the list), as - well as a state_id, which is found as an attribute in the corresponding State - object. - - It is possible to access sub-states in the composite states both by index - and by ID. - """ - - def __init__( - self, state_list: List[State], stamp: float = None, state_id=None - ): - """ - Parameters - ---------- - state_list: List[State] - List of State that forms this composite state - stamp: float, optional - Timestamp of the composite state. This can technically be different - from the timestamps of the substates. - state_id: Any, optional - State ID of the composite state. This can be different from the - state IDs of the substates. - """ - if isinstance(state_list, tuple): - state_list = list(state_list) - - #:List[State]: The substates are the CompositeState's value. - self.value = state_list - - self.stamp = stamp - self.state_id = state_id - - def __getstate__(self): - """ - Get the state of the object for pickling. - """ - # When using __slots__ the pickle module expects a tuple from __getstate__. - # See https://stackoverflow.com/questions/1939058/simple-example-of-use-of-setstate-and-getstate/41754104#41754104 - return ( - None, - { - "value": self.value, - "stamp": self.stamp, - "state_id": self.state_id, - }, - ) - - def __setstate__(self, attributes): - """ - Set the state of the object for unpickling. - """ - # When using __slots__ the pickle module sends a tuple for __setstate__. - # See https://stackoverflow.com/questions/1939058/simple-example-of-use-of-setstate-and-getstate/41754104#41754104 - - attributes = attributes[1] - self.value = attributes["value"] - self.stamp = attributes["stamp"] - self.state_id = attributes["state_id"] - - @property - def dof(self): - return sum([x.dof for x in self.value]) - - def get_index_by_id(self, state_id): - """ - Get index of a particular state_id in the list of states. - """ - return [x.state_id for x in self.value].index(state_id) - - def get_slices(self) -> List[slice]: - """ - Get slices for each state in the list of states. - """ - slices = [] - counter = 0 - for state in self.value: - slices.append(slice(counter, counter + state.dof)) - counter += state.dof - - return slices - - def add_state(self, state: State, stamp: float = None, state_id=None): - """Adds a state and it's corresponding slice to the composite state.""" - self.value.append(state) - - def remove_state_by_id(self, state_id): - """Removes a given state by ID.""" - idx = self.get_index_by_id(state_id) - self.value.pop(idx) - - def get_slice_by_id(self, state_id, slices=None): - """ - Get slice of a particular state_id in the list of states. - """ - - if slices is None: - slices = self.get_slices() - - idx = self.get_index_by_id(state_id) - return slices[idx] - - def get_matrix_block_by_ids( - self, mat: np.ndarray, state_id_1: Any, state_id_2: Any = None - ) -> np.ndarray: - """Gets the portion of a matrix corresponding to two states. - - This function is useful when extract specific blocks of a covariance - matrix, for example. - - Parameters - ---------- - mat : np.ndarray - N x N matrix - state_id_1 : Any - State ID of state 1. - state_id_2 : Any, optional - State ID of state 2. If None, state_id_2 is set to state_id_1. - - Returns - ------- - np.ndarray - Subblock of mat corrsponding to - slices of state_id_1 and state_id_2. - """ - - if state_id_2 is None: - state_id_2 = state_id_1 - - slice_1 = self.get_slice_by_id(state_id_1) - slice_2 = self.get_slice_by_id(state_id_2) - - return mat[slice_1, slice_2] - - def set_matrix_block_by_ids( - self, - new_mat_block: np.ndarray, - mat: np.ndarray, - state_id_1: Any, - state_id_2: Any = None, - ) -> np.ndarray: - """Sets the portion of the covariance block corresponding to two states. - - Parameters - ---------- - new_mat_block : np.ndarray - A subblock to be entered into mat. - mat : np.ndarray - Full matrix. - state_id_1 : Any - State ID of state 1. - state_id_2 : Any, optional - State ID of state 2. If None, state_id_2 is set to state_id_1. - - Returns - ------- - np.ndarray - mat with updated subblock. - """ - - if state_id_2 is None: - state_id_2 = state_id_1 - - slice_1 = self.get_slice_by_id(state_id_1) - slice_2 = self.get_slice_by_id(state_id_2) - - mat[slice_1, slice_2] = new_mat_block - return mat - - def get_value_by_id(self, state_id) -> Any: - """ - Get state value by id. - """ - idx = self.get_index_by_id(state_id) - return self.value[idx].value - - def get_state_by_id(self, state_id) -> State: - """ - Get state object by id. - """ - idx = self.get_index_by_id(state_id) - return self.value[idx] - - def get_dof_by_id(self, state_id) -> int: - """ - Get degrees of freedom of sub-state by id. - """ - idx = self.get_index_by_id(state_id) - return self.value[idx].dof - - def get_stamp_by_id(self, state_id) -> float: - """ - Get timestamp of sub-state by id. - """ - idx = self.get_index_by_id(state_id) - return self.value[idx].stamp - - def set_stamp_by_id(self, stamp: float, state_id): - """ - Set the timestamp of a sub-state by id. - """ - idx = self.get_index_by_id(state_id) - self.value[idx].stamp = stamp - - def set_state_by_id(self, state: State, state_id): - """ - Set the whole sub-state by id. - """ - idx = self.get_index_by_id(state_id) - self.value[idx] = state - - def set_value_by_id(self, value: Any, state_id: Any): - """ - Set the value of a sub-state by id. - """ - idx = self.get_index_by_id(state_id) - self.value[idx].value = value - - def set_stamp_for_all(self, stamp: float): - """ - Set the timestamp of all substates. - """ - for state in self.value: - state.stamp = stamp - - def to_list(self): - """ - Converts the CompositeState object back into a list of states. - """ - return self.value - - def copy(self) -> "CompositeState": - """ - Returns a new composite state object where the state values have also - been copied. - """ - return self.__class__( - [state.copy() for state in self.value], self.stamp, self.state_id - ) - - def plus(self, dx, new_stamp: float = None) -> "CompositeState": - """ - Updates the value of each sub-state given a dx. Interally parses - the dx vector. - """ - new = self.copy() - for i, state in enumerate(new.value): - new.value[i] = state.plus(dx[: state.dof]) - dx = dx[state.dof :] - - if new_stamp is not None: - new.set_stamp_for_all(new_stamp) - - return new - - def minus(self, x: "CompositeState") -> np.ndarray: - dx = [] - for i, v in enumerate(x.value): - dx.append( - self.value[i].minus(x.value[i]).reshape((self.value[i].dof,)) - ) - - return np.concatenate(dx).reshape((-1, 1)) - - def plus_by_id( - self, dx, state_id: int, new_stamp: float = None - ) -> "CompositeState": - """ - Updates a specific sub-state. - """ - new = self.copy() - idx = new.get_index_by_id(state_id) - new.value[idx].plus(dx) - if new_stamp is not None: - new.set_stamp_by_id(new_stamp, state_id) - - return new - - def jacobian_from_blocks(self, block_dict: dict): - """ - Returns the jacobian of the entire composite state given jacobians - associated with some of the substates. These are provided as a dictionary - with the the keys being the substate IDs. - """ - block: np.ndarray = list(block_dict.values())[0] - m = block.shape[0] # Dimension of "y" value - jac = np.zeros((m, self.dof)) - slices = self.get_slices() - for state_id, block in block_dict.items(): - slc = self.get_slice_by_id(state_id, slices) - jac[:, slc] = block - - return jac - - def plus_jacobian(self, dx: np.ndarray) -> np.ndarray: - dof = self.dof - jac = np.zeros((dof, dof)) - counter = 0 - for state in self.value: - jac[ - counter : counter + state.dof, - counter : counter + state.dof, - ] = state.plus_jacobian(dx[: state.dof]) - dx = dx[state.dof :] - counter += state.dof - - return jac - - def minus_jacobian(self, x: "CompositeState") -> np.ndarray: - dof = self.dof - jac = np.zeros((dof, dof)) - counter = 0 - for i, state in enumerate(self.value): - jac[ - counter : counter + state.dof, - counter : counter + state.dof, - ] = state.minus_jacobian(x.value[i]) - counter += state.dof - - return jac - - def __repr__(self): - substate_line_list = [] - for v in self.value: - substate_line_list.extend(v.__repr__().split("\n")) - substates_str = "\n".join([" " + s for s in substate_line_list]) - s = [ - f"{self.__class__.__name__}(stamp={self.stamp}, state_id={self.state_id}) with substates:", - substates_str, - ] - return "\n".join(s) - - class VectorInput(Input): """ A standard vector-based input, with value represented by a 1D numpy array. @@ -1000,7 +667,7 @@ def __init__( state_id: Any = None, covariance=None, ): - """ + """ Parameters ---------- value : np.ndarray diff --git a/navlie/types.py b/navlie/types.py index 9774f3ff..b5b79d7f 100644 --- a/navlie/types.py +++ b/navlie/types.py @@ -51,8 +51,8 @@ class State(ABC): - a value of some sort; - a certain number of degrees of freedom (dof); - - ``plus`` and ``minus`` methods that generalize addition and subtracting to - to this object. + - ``plus`` and ``minus`` methods that generalize addition and subtracting to + to this object. Optionally, it is often useful to assign a timestamp (``stamp``) and a label (``state_id``) to differentiate state instances from others. @@ -122,8 +122,8 @@ def plus_jacobian(self, dx: np.ndarray) -> np.ndarray: .. math:: \mathbf{J} = \\frac{D (\mathcal{X} \oplus \delta \mathbf{x})}{D \delta \mathbf{x}} - - + + For Lie groups, this is known as the *group Jacobian*. """ return self.plus_jacobian_fd(dx) @@ -146,11 +146,11 @@ def plus_jacobian_fd(self, dx, step_size=1e-8) -> np.ndarray: def minus_jacobian(self, x: "State") -> np.ndarray: """ Jacobian of the ``minus`` operator with respect to self. - + .. math:: \mathbf{J} = \\frac{D (\mathcal{Y} \ominus \mathcal{X})}{D \mathcal{Y}} - + That is, if ``dx = y.minus(x)`` then this is the Jacobian of ``dx`` with respect to ``y``. For Lie groups, this is the inverse of the *group Jacobian* evaluated at ``dx = x1.minus(x2)``. @@ -274,7 +274,7 @@ class ProcessModel(ABC): of either two ways. **1. Specifying the covariance matrix directly:** - + The first way is to specify the :math:`\mathbf{Q}_k` covariance matrix directly by overriding the ``covariance`` method. This covariance matrix represents the distribution of process model errors directly. @@ -429,7 +429,7 @@ def sqrt_information(self, x: State, u: Input, dt: float) -> np.ndarray: def input_covariance(self, x: State, u: Input, dt: float) -> np.ndarray: """ - Covariance matrix of additive noise *on the input*. + Covariance matrix of additive noise *on the input*. Parameters ---------- @@ -511,7 +511,7 @@ def minus(self, y_check: np.ndarray) -> np.ndarray: class StateWithCovariance: """ - A data container containing a ``State`` object and a covariance array. + A data container containing a ``State`` object and a covariance array. This class can be used as-is without inheritance. """ diff --git a/navlie/utils.py b/navlie/utils.py index 97bdbcdb..150144ec 100644 --- a/navlie/utils.py +++ b/navlie/utils.py @@ -94,12 +94,12 @@ class GaussianResultList: results[:, 3:] # returns all degrees of freedom except the first three This can be very useful if you want to examine the nees or rmse of just some - states, or if you want to plot the error of just some states. For example, + states, or if you want to plot the error of just some states. For example, if you have an SE3State, where the first 3 degrees of freedom are attitude, and the last 3 are position, you can plot only the attitude error with .. code-block:: python - + nav.plot_error(results[:, 0:3]) and likewise get only the attitude NEES with ``results[:, 0:3].nees``. @@ -165,7 +165,6 @@ def __init__(self, result_list: List[GaussianResult]): self.value_true = np.array([r.state_true.value for r in result_list]) def __getitem__(self, key): - # TODO need more tests for all cases! if isinstance(key, tuple): if not len(key) == 2: raise IndexError("Only two dimensional indexing is supported") diff --git a/tests/unit/test_composite.py b/tests/unit/test_composite.py index 8c654ebb..d852c583 100644 --- a/tests/unit/test_composite.py +++ b/tests/unit/test_composite.py @@ -6,12 +6,12 @@ VectorInput, SE2State, VectorState, - CompositeState, ) from pymlg import SE2 import numpy as np import pickle import os +from navlie import CompositeState def test_composite_process_model(): diff --git a/tests/unit/test_meas_models.py b/tests/unit/test_meas_models.py index b4450b54..e6521539 100644 --- a/tests/unit/test_meas_models.py +++ b/tests/unit/test_meas_models.py @@ -1,14 +1,10 @@ -from navlie.lib.states import ( +from navlie.lib import ( MatrixLieGroupState, SO3State, SE2State, SE3State, SE23State, - CompositeState, - VectorState, -) -from navlie.lib.imu import IMUState -from navlie.lib.models import ( + IMUState, Altitude, GlobalPosition, InvariantMeasurement, @@ -19,8 +15,9 @@ RangePoseToPose, Gravitometer, AbsoluteVelocity, + VectorState ) -from navlie.types import Measurement, MeasurementModel +from navlie import Measurement, MeasurementModel, CompositeState from pymlg import SO3, SE3, SE2, SE3, SE23 import numpy as np import pytest diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 65b6ab5b..3c25387f 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -60,6 +60,7 @@ def test_gaussian_result_slicing(): assert np.all(grl_test.error == grl.error[:, slc]) assert np.allclose(grl_test.nees, nees_test) + def test_gaussian_result_list_slicing_equivalency(): # Construct a dummy GaussianResultList x_true = [SE23State(SE23.random(), stamp=i) for i in range(10)] @@ -67,16 +68,20 @@ def test_gaussian_result_list_slicing_equivalency(): cov = [(i + 1) * np.eye(9) for i in range(10)] x_cov = [StateWithCovariance(x, c) for x, c in zip(x_hat, cov)] gr = [GaussianResult(x, t) for x, t in zip(x_cov, x_true)] - results = GaussianResultList(gr) - - results[0:10] # returns the first 10 time steps - results[:, 0] # returns the first degree of freedom - results[0:10, 0] # returns the first degree of freedom for the first 10 time steps - results[0:10, [0, 1]] # returns the first two degrees of freedom for the first 10 time steps - results[:, 3:] # returns the all but the first three degrees of freedom - + results = GaussianResultList(gr) + + results[0:10] # returns the first 10 time steps + results[:, 0] # returns the first degree of freedom + results[ + 0:10, 0 + ] # returns the first degree of freedom for the first 10 time steps + results[ + 0:10, [0, 1] + ] # returns the first two degrees of freedom for the first 10 time steps + results[:, 3:] # returns the all but the first three degrees of freedom + assert np.all(results[0:10].error == results[0:10, :].error) - assert np.all(results[:, [0,1,2]].error == results[:, 0:3].error) + assert np.all(results[:, [0, 1, 2]].error == results[:, 0:3].error) assert np.all(results[:, 3:].error == results[:, 3:9].error)