From 8e6f7c86a0ad6692d8a171f0bdae74c8d2627119 Mon Sep 17 00:00:00 2001 From: Mitchell Cohen Date: Mon, 22 Jan 2024 19:29:50 -0500 Subject: [PATCH] Add simple bundle adjustment example, CameraProjection measurement model --- examples/ex_bundle_adjustment.py | 231 +++++++++++++++++++++++++++++++ navlie/lib/__init__.py | 1 + navlie/lib/camera.py | 14 +- navlie/lib/models.py | 121 +++++++++++++++- tests/unit/test_camera.py | 10 +- tests/unit/test_meas_models.py | 28 +++- 6 files changed, 388 insertions(+), 17 deletions(-) create mode 100644 examples/ex_bundle_adjustment.py diff --git a/examples/ex_bundle_adjustment.py b/examples/ex_bundle_adjustment.py new file mode 100644 index 00000000..252e156a --- /dev/null +++ b/examples/ex_bundle_adjustment.py @@ -0,0 +1,231 @@ +"""A simple stereo bundle adjustment example using navlie's nonlinear least +squares solver.""" + +import typing + +import matplotlib.pyplot as plt +import numpy as np +from pymlg import SE3 + +from navlie.batch.problem import Problem +from navlie.batch.residuals import Residual +from navlie.composite import CompositeState +from navlie.lib.camera import PinholeCamera, PoseMatrix +from navlie.lib.models import CameraProjection +from navlie.lib.states import SE3State, VectorState +from navlie.types import Measurement, State +from navlie.utils import plot_poses, randvec + +def main(): + # Simulation parameters + n_landmarks_width = 10 + n_landmarks_height = 10 + depth_landmark = 1.0 + # Generate poses and landmarks for the problem + poses = generate_poses() + landmarks = generate_landmarks( + n_landmarks_width, n_landmarks_height, depth_landmark + ) + print(f"Number of poses generated: {len(poses)}") + print(f"Number of landmarks generated: {len(landmarks)}") + + # Create a camera model and generate measurements + # T_bc is the transformation between the robot frame and the camera frame, w + # where the camera frame has the z-axis pointing through the optical center. + T_bc_0 = PoseMatrix( + SE3.from_components(PinholeCamera.get_cam_to_enu(), np.array([0.0, 0.0, 0.0])) + ) + T_bc_1 = PoseMatrix( + SE3.from_components(PinholeCamera.get_cam_to_enu(), np.array([0.0, -0.05, 0.0])) + ) + camera_0 = PinholeCamera(385, 385, 325, 235, 640, 480, 1.0, T_bc_0) + camera_1 = PinholeCamera(385, 385, 325, 235, 640, 480, 1.0, T_bc_1) + cameras = [camera_0, camera_1] + measurements = generate_measurements(poses, landmarks, cameras) + + # Generate initial guess for optimizer by perturbing all states from groundtruth + poses_init: typing.Dict[str, SE3State] = {} + for i, (pose_id, pose) in enumerate(poses.items()): + # Initialize first pose to groundtruth + if i == 0: + pose_value = pose.copy() + else: + pose_value = pose @ SE3.Exp(randvec(np.identity(6) * 0.0001)) + poses_init[pose_id] = SE3State( + value=pose_value, state_id=pose_id, direction="right" + ) + + landmarks_init: typing.Dict[str, VectorState] = {} + for landmark_id, landmark in landmarks.items(): + landmark_value = landmark + randvec(np.identity(3) * 0.01).ravel() + landmarks_init[landmark_id] = VectorState( + value=landmark_value, state_id=landmark_id + ) + + # Create and solve the problem + problem = Problem(solver="GN") + + # Add poses and landamrks to problem + for pose_id, pose in poses_init.items(): + problem.add_variable(pose_id, pose) + for landmark_id, landmark in landmarks_init.items(): + problem.add_variable(landmark_id, landmark) + + # Hold the first pose constant + problem.set_variables_constant(["p0"]) + + # Add factors to problem + for meas in measurements: + keys = meas.state_id + residual = ReprojectionResidual(keys, meas) + problem.add_residual(residual) + + # Solve the problem + opt_results = problem.solve() + variables_opt = opt_results["variables"] + print(opt_results["summary"]) + # Extract estimates + poses_est: typing.Dict[str, SE3State] = {} + landmarks_est: typing.Dict[str, VectorState] = {} + for pose_id in poses_init.keys(): + poses_est[pose_id] = variables_opt[pose_id] + for landmark_id in landmarks_init.keys(): + landmarks_est[landmark_id] = variables_opt[landmark_id] + + # Plot initial and estimated poses + init_pose_list = list(poses_init.values()) + init_landmarks_list = [x.value for x in landmarks_init.values()] + fig, ax = plot_bundle_adjustment(init_landmarks_list, init_pose_list) + fig.suptitle("Initial Guess") + opt_pose_list = list(poses_est.values()) + opt_landmarks_list = [x.value for x in landmarks_est.values()] + fig, ax = plot_bundle_adjustment(opt_landmarks_list, opt_pose_list) + fig.suptitle("Optimized Solution") + +class ReprojectionResidual(Residual): + def __init__( + self, + keys: typing.List[typing.Hashable], + meas: Measurement, + ): + super().__init__(keys) + self.meas = meas + # Evaluate the square root information a single time since it does not + # depend on the state in this case + self.sqrt_information = self.meas.model.sqrt_information([]) + + def evaluate( + self, + states: typing.List[State], + compute_jacobians: typing.List[bool] = None, + ) -> typing.Tuple[np.ndarray, typing.List[np.ndarray]]: + # Create a composite state to evaluate the measurement model + eval_state = CompositeState(states) + # Evaluate the measurement model + y_check = self.meas.model.evaluate(eval_state) + # Compute the residual + error = self.meas.value - y_check + + L = self.sqrt_information + error = L.T @ error + + if compute_jacobians: + jacobians = [None] * len(states) + full_jac = -self.meas.model.jacobian(eval_state) + if compute_jacobians[0]: + jacobians[0] = L.T @ full_jac[:, :6] + if compute_jacobians[1]: + jacobians[1] = L.T @ full_jac[:, 6:] + + return error, jacobians + return error + + +def generate_poses() -> typing.Dict[str, np.ndarray]: + """Generates some poses for the BA problem.""" + poses: typing.Dict[np.ndarray] = {} + poses["p0"] = SE3.Exp(np.array([0.2, 0, -0.2, 0.0, 0.4, 0.2])) + poses["p1"] = SE3.Exp(np.array([0, 0.1, 0, 0.1, 0.2, 0.1])) + poses["p2"] = SE3.Exp(np.array([0.5, 0.5, 0.2, 0, 0.1, 0.3])) + return poses + + +def generate_landmarks( + num_landmarks_width: int, + num_landmarks_height: int, + depth_landmark: float, +) -> typing.Dict[str, np.ndarray]: + """Generates a grid of landmarks in the plane at a fixed depth.""" + widths = np.linspace(0, 0.3, num_landmarks_width) + heights = np.linspace(0, 0.3, num_landmarks_height) + + landmarks: typing.Dict[str, np.ndarray] = {} + landmark_num = 0 + for width in widths: + for height in heights: + landmark_id = "l" + str(landmark_num) + landmarks[landmark_id] = np.array([depth_landmark, width, height]) + landmark_num += 1 + return landmarks + + +def generate_measurements( + poses: typing.Dict[str, np.ndarray], + landmarks: typing.Dict[str, np.ndarray], + cameras: typing.List[PinholeCamera], +) -> typing.List[Measurement]: + """Generates measurements of landmarks taken from a number of camera + poses.""" + meas_list: typing.List[Measurement] = [] + for pose_id, pose in poses.items(): + for landmark_id, landmark in landmarks.items(): + for camera in cameras: + # Evaluate noiseless measurement + # Create a PoseMatrix object from the pose matrix + noiseless_y = camera.evaluate(PoseMatrix(pose), landmark) + # Add noise + cov_noise = np.identity(2) * camera.sigma**2 + y_noise = noiseless_y + randvec(cov_noise).ravel() + # Check if measurement is valid + if not camera.is_measurement_valid(y_noise): + continue + state_ids = [pose_id, landmark_id] + meas_model = CameraProjection(pose_id, landmark_id, camera) + meas = Measurement(y_noise, state_id=state_ids, model=meas_model) + meas_list.append(meas) + + return meas_list + + +def plot_bundle_adjustment( + landmarks: typing.List[np.ndarray], + pose_list: typing.List[SE3State], + ax: plt.Axes = None, +): + if ax is None: + fig = plt.figure() + ax = fig.add_subplot(111, projection="3d") + + landmarks = np.array(landmarks) + ax.scatter( + landmarks[:, 0], + landmarks[:, 1], + landmarks[:, 2], + c="tab:blue", + ) + plot_poses( + pose_list, + ax=ax, + arrow_length=0.1, + step=1, + line_color="tab:blue", + label="Initial", + ) + ax.set_xlabel("x (m)") + ax.set_ylabel("y (m)") + ax.set_zlabel("z (m)") + return fig, ax + +if __name__ == "__main__": + main() + plt.show() diff --git a/navlie/lib/__init__.py b/navlie/lib/__init__.py index a37a31d0..5208f092 100644 --- a/navlie/lib/__init__.py +++ b/navlie/lib/__init__.py @@ -39,6 +39,7 @@ AbsoluteVelocity, AbsolutePosition, PointRelativePositionSLAM, + CameraProjection ) from .preintegration import ( diff --git a/navlie/lib/camera.py b/navlie/lib/camera.py index e43c5534..ac29a900 100644 --- a/navlie/lib/camera.py +++ b/navlie/lib/camera.py @@ -1,4 +1,8 @@ -"""Module containing a basic pinhole camera model.""" +"""Module containing a basic pinhole camera model. + +See the `CameraProjection` measurement model for an example of how this can be +used with navlie's `MeasurementModel` class. +""" import numpy as np from navlie.lib import SE3State @@ -23,7 +27,7 @@ def copy(self): return PoseMatrix(self.pose.copy()) -class Camera: +class PinholeCamera: """Class for a pinhole camera model. This class contains utilities for generating measurements @@ -43,7 +47,7 @@ def __init__( T_bc: PoseMatrix = None, camera_id: Any = None, ): - """Instantiate a Camera + """Instantiate a PinholeCamera Parameters ---------- @@ -125,9 +129,9 @@ def sigma_normalized_image_coords(self) -> np.ndarray: def R_normalized_image_coords(self) -> np.ndarray: return self.sigma_normalized_image_coords**2 - def copy(self) -> "Camera": + def copy(self) -> "PinholeCamera": """Returns a copy of the camera model.""" - return Camera( + return PinholeCamera( self.fu, self.fv, self.cu, diff --git a/navlie/lib/models.py b/navlie/lib/models.py index fa38c341..540c3b11 100644 --- a/navlie/lib/models.py +++ b/navlie/lib/models.py @@ -12,7 +12,7 @@ from pymlg import SO2, SO3 import numpy as np from typing import List, Any - +from navlie.lib.camera import PinholeCamera from navlie.composite import ( CompositeInput, CompositeProcessModel, @@ -494,6 +494,7 @@ def jacobian(self, x: MatrixLieGroupState) -> np.ndarray: def covariance(self, x: MatrixLieGroupState) -> np.ndarray: return self._R + class PointRelativePositionSLAM(MeasurementModel): """ Measurement model describing the position of an unknown landmark relative @@ -510,6 +511,7 @@ class PointRelativePositionSLAM(MeasurementModel): This class is comptabile with ``SE2State, SE3State, SE23State, IMUState``. """ + def __init__( self, pose_state_id: Any, @@ -563,10 +565,119 @@ def jacobian(self, x: CompositeState) -> np.ndarray: jac_dict[state_ids[0]] = pose_jacobian jac_dict[state_ids[1]] = landmark_jacobian return x.jacobian_from_blocks(jac_dict) - + + def covariance(self, x: CompositeState) -> np.ndarray: + return self._R + + +class CameraProjection(MeasurementModel): + """ + Measurement model describing the projection of a landmark into a + camera. In thihs measurement model, the robot pose and the landmark are both states. + + The camera is assumed to be a fixed transformation from the robot body frame, + where :math:`(\mathbf{C}_{bc} \in SO(3), \mathbf{r}_b^{bc} \in \mathbb{R}^3)` represent the extrinsic + transformation from the body to the camera frame. The measurement model is + then in the form + + .. math:: + \mathbf{y} = \mathbf{g} (\mathbf{r}_c^{\ell c}), + where :math:`\mathbf{r}_c^{\ell c}` is the position of the landmark resolved in the camera frame, written as + + .. math:: + \mathbf{r}_c^{\ell c} = \mathbf{C}_{bc}^T (\mathbf{C}_{ab}^T (\mathbf{r}_{\ell} - \mathbf{r}) - \mathbf{r}_b^{bc}). + + Here, the robot pose is given by :math:`(\mathbf{C}_{ab} \in SO(3), + \mathbf{r} \in \mathbb{R}^3 )`, and the landmark position is given by :math:`\mathbf{r}_\ell \in \mathbb{R}^3`. + Also note that :math:`\mathbf{g} (\cdot)` is the standard pinhole projection + model, given by + + .. math:: + g([x, y, z]^T) = [f_u * (x / z) + c_u, f_v * (y / z) + c_v]^T, + + where :math:`f_u, f_v, c_u, c_v` are the camera intrinsics. + """ + + def __init__( + self, pose_state_id: Any, landmark_state_id: Any, camera: PinholeCamera + ): + """ + Parameters + ---------- + pose_state_id : Any + The state ID of the pose state + landmark_state_id : Any + The state ID of the landmark state + camera : PinholeCamera model + The PinholeCamera model, containing the information about the camera + including the intrinsics and the extrinsics. + """ + self._pose_state_id = pose_state_id + self._landmark_state_id = landmark_state_id + self._camera = camera + + self._R = np.identity(2) * camera.sigma**2 + + def evaluate(self, x: CompositeState) -> np.ndarray: + pose: MatrixLieGroupState = x.get_state_by_id(self._pose_state_id) + landmark: VectorState = x.get_state_by_id(self._landmark_state_id) + return self._camera.evaluate(pose, landmark.value) + + def jacobian(self, x: CompositeState) -> np.ndarray: + pose: MatrixLieGroupState = x.get_state_by_id(self._pose_state_id) + landmark: VectorState = x.get_state_by_id(self._landmark_state_id) + + # Compute the Jacobian of the measurement model + if pose.direction == "right": + r_pc_c = self._camera.resolve_landmark_in_cam_frame( + pose, landmark.value + ) + dg_dr = self._compute_jac_of_projection_model(r_pc_c) + # Extract relevant states + C_bc = self._camera.T_bc.attitude + C_ab = pose.attitude + r_zw_a = pose.position.reshape((-1, 1)) + r_pw_a = landmark.value.reshape((-1, 1)) + + # Compute Jacobian with respect to the pose + jac_attitude = C_bc.T @ SO3.cross(C_ab.T @ (r_pw_a - r_zw_a)) + jac_position = -C_bc.T + # Compute Jacobian with respect to the landmark + jac_landmark = C_bc.T @ C_ab.T + + # Chain rule + jac_attitude = dg_dr @ jac_attitude + jac_position = dg_dr @ jac_position + jac_landmark = dg_dr @ jac_landmark + + pose_jacobian = pose.jacobian_from_blocks( + attitude=jac_attitude, position=jac_position + ) + if pose.direction == "left": + raise NotImplementedError("Left Jacobian not implemented") + + # Build full Jacobian + jac_dict = {} + jac_dict[self._pose_state_id] = pose_jacobian + jac_dict[self._landmark_state_id] = jac_landmark + return x.jacobian_from_blocks(jac_dict) + def covariance(self, x: CompositeState) -> np.ndarray: return self._R + def _compute_jac_of_projection_model( + self, r_pc_c: np.ndarray + ) -> np.ndarray: + x, y, z = r_pc_c.ravel() + fu = self._camera.fu + fv = self._camera.fv + dg_dr = (1.0 / z) * np.array( + [[fu, 0, -fu * x / z], [0, fv, -fv * y / z]] + ) + + return dg_dr + + class InvariantPointRelativePosition(MeasurementModel): def __init__(self, y: np.ndarray, model: PointRelativePosition): self.y = y.ravel() @@ -698,16 +809,17 @@ class RangePoseToPose(MeasurementModel): Compatible with ``SE2State, SE3State, SE23State, IMUState``. """ + def __init__( self, tag_body_position1, tag_body_position2, state_id1, state_id2, R ): - """ + """ Parameters ---------- tag_body_position1 : np.ndarray Position of tag in body frame of Robot 1. tag_body_position2 : np.ndarray - Position of tag in body frame of Robot 2. + Position of tag in body frame of Robot 2. state_id1 : Any State ID of Robot 1. state_id2 : Any @@ -720,7 +832,6 @@ def __init__( # of the passed state). Unfortunately, changing argument order is a # breaking change. - self.tag_body_position1 = np.array(tag_body_position1).flatten() self.tag_body_position2 = np.array(tag_body_position2).flatten() self.state_id1 = state_id1 diff --git a/tests/unit/test_camera.py b/tests/unit/test_camera.py index 466b2b25..6cac0b84 100644 --- a/tests/unit/test_camera.py +++ b/tests/unit/test_camera.py @@ -1,13 +1,13 @@ -from navlie.lib.camera import Camera, PoseMatrix +from navlie.lib.camera import PinholeCamera, PoseMatrix import numpy as np from pymlg import SE3 def test_valid_measurements(): # Create camera with its z-axis pointing forward - C_bc = Camera.get_cam_to_enu() + C_bc = PinholeCamera.get_cam_to_enu() T_bc = PoseMatrix(SE3.from_components(C_bc, np.array([0, 0, 0]))) - camera = Camera(385, 385, 323, 236, 480, 640, 0.1, T_bc) + camera = PinholeCamera(385, 385, 323, 236, 480, 640, 0.1, T_bc) # Create some invalid measurements invalid_meas = [ @@ -51,7 +51,7 @@ def test_valid_measurements(): def test_project_function(): - camera = Camera(385, 385, 323, 236, 480, 640, 0.1) + camera = PinholeCamera(385, 385, 323, 236, 480, 640, 0.1) # Define a landmark resolved in the camera frame r_pc_c = np.random.randn(3, 1) # Project onto image plane @@ -69,7 +69,7 @@ def test_camera_intrinsics(): fv = 385 cu = 323 cv = 236 - camera = Camera(fu, fv, cu, cv, 480, 640, 0.1) + camera = PinholeCamera(fu, fv, cu, cv, 480, 640, 0.1) K = camera.intrinsics assert K[0, 0] == fu diff --git a/tests/unit/test_meas_models.py b/tests/unit/test_meas_models.py index e6521539..f03e3d91 100644 --- a/tests/unit/test_meas_models.py +++ b/tests/unit/test_meas_models.py @@ -15,8 +15,10 @@ RangePoseToPose, Gravitometer, AbsoluteVelocity, - VectorState + VectorState, + CameraProjection, ) +from navlie.lib.camera import PinholeCamera from navlie import Measurement, MeasurementModel, CompositeState from pymlg import SO3, SE3, SE2, SE3, SE23 import numpy as np @@ -245,6 +247,28 @@ def test_magnetometer_se23(direction): _jacobian_test(x, model) +def test_camera_projection(direction: str = "right"): + # TODO: Jacobians are only derived for the right pertubation + # Implement left perturbation Jacobians + pose = SE3State( + SE3.Exp([0, 1, 2, 4, 5, 6]), + stamp=0.0, + state_id="p", + direction=direction, + ) + landmark = VectorState([1, 2, 3], 0.0, "l") + x = CompositeState([pose, landmark], stamp=0.0) + T_bc = SE3State( + value=SE3.from_components( + PinholeCamera.get_cam_to_enu(), np.array([0.0, 0.0, 0.0]) + ) + ) + camera = PinholeCamera(385, 385, 325, 235, 640, 480, 1.0, T_bc) + model = CameraProjection(pose.state_id, landmark.state_id, camera) + + _jacobian_test(x, model) + + def test_invariant_magnetometer_so3(): x = SO3State( SO3.Exp([0.3, 0.1, 0.2]), stamp=0.0, state_id=2, direction="left" @@ -341,4 +365,4 @@ def test_landmark_relative_position_se23(direction): if __name__ == "__main__": - test_range_pose_to_pose_se3() + test_camera_projection()