Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(obstacle_stop_planner): add test #599

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions planning/obstacle_stop_planner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ rclcpp_components_register_node(obstacle_stop_planner
EXECUTABLE obstacle_stop_planner_node
)

if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()
find_package(launch_testing_ament_cmake REQUIRED)
add_launch_test(test/test_obstacle_stop_planner_node.test.py)

install(DIRECTORY
#test/config
DESTINATION share/${PROJECT_NAME}/
)
endif()

ament_auto_package(
INSTALL_TO_SHARE
config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@
emergency_stop_acceleration: -5.0 # supposed minimum acceleration (deceleration) in emergency stop [m/ss]
emergency_stop_idling_time: 0.5 # supposed idling time to start emergency stop [s]
min_dist_stop: 4.0 # minimum distance of emergency stop [m]
obstacle_emergency_stop_acceleration: -5.0 # supposed minimum acceleration (deceleration) in emergency stop [m/ss]
max_standard_acceleration: 0.5 # supposed maximum acceleration in active cruise control [m/ss]
min_standard_acceleration: -1.0 # supposed minimum acceleration (deceleration) in active cruise control
standard_idling_time: 0.5 # supposed idling time to react object in active cruise control [s]
min_dist_standard: 4.0 # minimum distance in active cruise control [m]
obstacle_min_standard_acceleration: -1.5 # supposed minimum acceleration of forward obstacle [m/ss]
margin_rate_to_change_vel: 0.3 # margin to insert upper velocity [-]
use_time_compensation_to_calc_distance: true # use time-compensation to calculate distance to forward vehicle

# pid parameter for ACC
p_coefficient_positive: 0.1 # coefficient P in PID control (used when target dist -current_dist >=0) [-]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/**:
ros__parameters:
# ego
ego_nearest_dist_threshold: 3.0 # [m]
ego_nearest_yaw_threshold: 1.046 # [rad] = 60 [deg]
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

stop_planner:
stop_margin: 5.0 # stop margin distance from obstacle on the path [m]
min_behavior_stop_margin: 2.0 # stop margin distance when any other stop point is inserted in stop margin [m]
min_behavior_stop_margin: 5.0 # stop margin distance when any other stop point is inserted in stop margin [m]
takayuki5168 marked this conversation as resolved.
Show resolved Hide resolved
step_length: 1.0 # step length for pointcloud search range [m]
extend_distance: 0.0 # extend trajectory to consider after goal obstacle in the extend_distance
expand_stop_range: 0.0 # margin of vehicle footprint [m]
Expand All @@ -13,7 +13,7 @@
slow_down_planner:
# slow down planner parameters
forward_margin: 5.0 # margin distance from slow down point to vehicle front [m]
backward_margin: 5.0 # margin distance from slow down point to vehicle rear [m]
backward_margin: 0.0 # margin distance from slow down point to vehicle rear [m]
expand_slow_down_range: 1.0 # offset from vehicle side edge for expanding the search area of the surrounding point cloud [m]
max_slow_down_vel: 1.38 # max slow down velocity [m/s]
min_slow_down_vel: 0.28 # min slow down velocity [m/s]
Expand All @@ -22,7 +22,7 @@
consider_constraints: False # set "True", if no decel plan found under jerk/dec constrains, relax target slow down vel
forward_margin_min: 1.0 # min margin for relaxing slow down margin [m/s]
forward_margin_span: -0.1 # fineness param for relaxing slow down margin [m/s]
jerk_min_slow_down: -0.3 # min slow down jerk constraint [m/sss]
jerk_min_slow_down: -0.6 # min slow down jerk constraint [m/sss]
jerk_span: -0.01 # fineness param for planning deceleration jerk [m/sss]
jerk_start: -0.1 # init jerk used for deceleration planning [m/sss]
slow_down_vel: 1.38 # target slow down velocity [m/s]
3 changes: 3 additions & 0 deletions planning/obstacle_stop_planner/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@

<test_depend>ament_lint_auto</test_depend>
<test_depend>autoware_lint_common</test_depend>
<test_depend>launch</test_depend>
<test_depend>launch_ros</test_depend>
<test_depend>launch_testing</test_depend>

<export>
<build_type>ament_cmake</build_type>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
# Copyright 2022 Tier IV, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import math
import struct
import unittest

import autoware_auto_perception_msgs.msg
import autoware_auto_planning_msgs.msg
import launch
import launch_ros
from launch_ros.substitutions import FindPackageShare
import launch_testing
import nav_msgs.msg
import pytest
import rclpy
from ros2param.api import call_get_parameters
import sensor_msgs.msg


@pytest.mark.launch_test
def generate_test_description():

obstacle_stop_planner_node = launch_ros.actions.Node(
package="obstacle_stop_planner",
executable="obstacle_stop_planner_node",
name="obstacle_stop_planner_node",
remappings=[
("~/input/pointcloud", "input/pointcloud"),
("~/input/trajectory", "input/trajectory"),
("~/input/odometry", "input/odometry"),
("~/input/objects", "input/objects"),
("~/output/trajectory", "output/trajectory"),
],
parameters=[
[FindPackageShare("vehicle_info_util"), "/config/vehicle_info.param.yaml"],
[FindPackageShare("obstacle_stop_planner"), "/config/obstacle_stop_planner.param.yaml"],
[FindPackageShare("obstacle_stop_planner"), "/config/default_common.param.yaml"],
[
FindPackageShare("obstacle_stop_planner"),
"/config/default_nearest_search.param.yaml",
],
[
FindPackageShare("obstacle_stop_planner"),
"/config/adaptive_cruise_control.param.yaml",
],
],
)
static_transform_publisher_node = launch_ros.actions.Node(
package="tf2_ros",
executable="static_transform_publisher",
name="static_transform_publisher_node",
arguments=["0", "0", "0", "0", "0", "0", "map", "base_link"],
)

return (
launch.LaunchDescription(
[
obstacle_stop_planner_node,
static_transform_publisher_node,
# Start tests right away - no need to wait for anything
launch_testing.actions.ReadyToTest(),
]
),
{
"obstacle_stop_planner": obstacle_stop_planner_node,
"static_transform_publisher": static_transform_publisher_node,
},
)


class TestObstacleStopPlannerLink(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Initialize the ROS context for the test node
rclpy.init()

@classmethod
def tearDownClass(cls):
# Shutdown the ROS context
rclpy.shutdown()

def setUp(self):
# Create a ROS node for tests
self.node = rclpy.create_node("test_object_stop_planner_link")
self.event_name = "test_object_stop_planner"
params = call_get_parameters(
node=self.node,
node_name="/obstacle_stop_planner_node",
parameter_names=[
"stop_planner.stop_margin",
"wheel_base",
"front_overhang",
"stop_planner.expand_stop_range",
"wheel_tread",
"left_overhang",
"right_overhang",
],
)
self.stop_margin = params.values[0].double_value
wheel_base = params.values[1].double_value
front_overhang = params.values[2].double_value
self.base_link_to_front = wheel_base + front_overhang
expand_stop_range = params.values[3].double_value
wheel_tread = params.values[4].double_value
left_overhang = params.values[5].double_value
right_overhang = params.values[6].double_value
self.left_limit_line = wheel_tread / 2.0 + left_overhang + expand_stop_range
self.right_limit_line = wheel_tread / 2.0 + right_overhang + expand_stop_range

def tearDown(self):
takayuki5168 marked this conversation as resolved.
Show resolved Hide resolved
self.node.destroy_node()

def init_messages(self):
# message init
# frame id
map_frame = "map"
base_link_frame = "base_link"

# pointcloud2
self.pointcloud_msg = sensor_msgs.msg.PointCloud2()
self.pointcloud_msg.header.frame_id = base_link_frame
self.pointcloud_msg.header.stamp = self.node.get_clock().now().to_msg()
self.pointcloud_msg.height = 1
self.pointcloud_msg.width = 0
field = sensor_msgs.msg.PointField()
field.name = "x"
field.offset = 0
field.datatype = 7
field.count = 1
self.pointcloud_msg.fields.append(copy.deepcopy(field))
field.name = "y"
field.offset = 4
self.pointcloud_msg.fields.append(copy.deepcopy(field))
field.name = "z"
field.offset = 8
self.pointcloud_msg.fields.append(copy.deepcopy(field))
self.pointcloud_msg.is_bigendian = False
self.pointcloud_msg.point_step = 16
self.pointcloud_msg.row_step = self.pointcloud_msg.width * self.pointcloud_msg.point_step
self.pointcloud_msg.is_dense = True

# trajectory
self.trajectory_resolution = 0.1
self.trajectory_length = 100.0
trajectory_element = int(self.trajectory_length / self.trajectory_resolution)
self.trajectory_msg = autoware_auto_planning_msgs.msg.Trajectory()
self.trajectory_msg.header.frame_id = map_frame
self.trajectory_msg.header.stamp = self.node.get_clock().now().to_msg()
for x in range(trajectory_element):
trajectory_point = autoware_auto_planning_msgs.msg.TrajectoryPoint()
trajectory_point.pose.position.x = x * self.trajectory_resolution
trajectory_point.pose.orientation.w = 1.0
trajectory_point.longitudinal_velocity_mps = 25 / 3
self.trajectory_msg.points.append(copy.deepcopy(trajectory_point))

# odometry
self.odom_msg = nav_msgs.msg.Odometry()
self.odom_msg.header.frame_id = map_frame
self.odom_msg.header.stamp = self.node.get_clock().now().to_msg()
self.odom_msg.child_frame_id = base_link_frame
self.odom_msg.pose.pose.orientation.w = 1.0
self.odom_msg.twist.twist.linear.x = 0.0

# object
self.object_msg = autoware_auto_perception_msgs.msg.PredictedObjects()
self.object_msg.header.frame_id = map_frame
self.object_msg.header.stamp = self.node.get_clock().now().to_msg()

def set_pointcloud_messages(self, x, y):
self.pointcloud_msg.width = 1
self.pointcloud_msg.row_step = self.pointcloud_msg.width * self.pointcloud_msg.point_step
self.pointcloud_msg.data = []
for i in range(0, self.pointcloud_msg.width):
self.pointcloud_msg.data = (
self.point_to_list(x, y, i / 2, 0.0) + self.pointcloud_msg.data
)

def point_to_list(self, x, y, z, i):
point_list = (
struct.pack("<f", float(x))
+ struct.pack("<f", float(y))
+ struct.pack("<f", float(z))
+ struct.pack("<f", float(i))
)
return point_list

def clip(self, val, min_val, max_val):
return max(min(val, max_val), min_val)

def trajectory_evaluation(self, x, y, trajectory):
if self.left_limit_line > y and y > -self.right_limit_line:
# Inside the trajectory
stop_line = self.clip(
x - (self.stop_margin + self.base_link_to_front), 0, self.trajectory_length
)
stop_element = int(stop_line / self.trajectory_resolution) + 1
takayuki5168 marked this conversation as resolved.
Show resolved Hide resolved
check_vel = trajectory.points[stop_element].longitudinal_velocity_mps == 0.0
return check_vel
else:
# Outside the trajectory
check_vel = math.isclose(
trajectory.points[-1].longitudinal_velocity_mps,
self.trajectory_msg.points[-1].longitudinal_velocity_mps,
abs_tol=0.01,
)
return check_vel

def trajectory_callback(self, msg):
result_val = self.trajectory_evaluation(self.current_x, self.current_y, msg)
# print("x:",self.current_x," , y:",self.current_y," , result:",result_val)
self.callback_flag = True
if result_val:
self.true_count += 1
else:
print("Test failed at (x:", self.current_x, ", y:", self.current_y, ")")

def test_linear_trajectory(self):
# publisher
pointcloud_pub = self.node.create_publisher(
sensor_msgs.msg.PointCloud2, "input/pointcloud", 10
)
trajectory_pub = self.node.create_publisher(
autoware_auto_planning_msgs.msg.Trajectory, "input/trajectory", 10
)
odom_pub = self.node.create_publisher(nav_msgs.msg.Odometry, "input/odometry", 10)
object_pub = self.node.create_publisher(
autoware_auto_perception_msgs.msg.PredictedObjects, "input/objects", 10
)
# subscliber
self.node.create_subscription(
autoware_auto_planning_msgs.msg.Trajectory,
"output/trajectory",
self.trajectory_callback,
10,
)

self.init_messages()

# param [m]
y_point_resolution = 0.4
y_point_test_margin = 0.5
x_point_resolution = 10.0

self.current_x = 0.0
self.current_y = 0.0
self.callback_flag = False
self.true_count = 0

# test points list
test_point_x = list(range(0, int(self.trajectory_length / x_point_resolution), 1))
test_point_x = [n * x_point_resolution for n in test_point_x]
test_point_y = list(
range(
-int((self.right_limit_line + y_point_test_margin) / y_point_resolution),
int((self.left_limit_line + y_point_test_margin) / y_point_resolution),
1,
)
)
test_point_y = [n * y_point_resolution for n in test_point_y]
test_point_y.append(-self.right_limit_line)
test_point_y.append(self.left_limit_line)
# test_point_x.sort()
test_point_y.sort()

# while rclpy.ok():
for ix in test_point_x:
for iy in test_point_y:
self.current_x = float(ix)
self.current_y = float(iy)
self.set_pointcloud_messages(self.current_x, self.current_y)

# update time stamp
self.object_msg.header.stamp = self.node.get_clock().now().to_msg()
self.odom_msg.header.stamp = self.node.get_clock().now().to_msg()
self.pointcloud_msg.header.stamp = self.node.get_clock().now().to_msg()
self.trajectory_msg.header.stamp = self.node.get_clock().now().to_msg()

object_pub.publish(self.object_msg)
odom_pub.publish(self.odom_msg)
pointcloud_pub.publish(self.pointcloud_msg)
trajectory_pub.publish(self.trajectory_msg)

self.callback_flag = False
while not self.callback_flag:
rclpy.spin_once(self.node, timeout_sec=0.1)
self.assertEqual(self.true_count, len(test_point_x) * len(test_point_y))


@launch_testing.post_shutdown_test()
class TestProcessOutput(unittest.TestCase):
def test_exit_code(self, proc_info):
# Check that all processes in the launch (in this case, there's just one) exit with code 0
launch_testing.asserts.assertExitCodes(proc_info)