Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(distortion_corrector_node): replace imu and twist callback with polling subscriber #10057

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

interimadd
Copy link
Contributor

Description

Background

The distortion corrector node subscribes to twist and IMU topics using callbacks.
When these topics are published at a high frequency (e.g., 100Hz), the overhead of calling the callback function is significant.

Changed

Replaced the subscription callback function with polling using take().

Related links

Private Links:

How was this PR tested?

Test Item 1: Output Unchanged

Verified that the output topics (IMU, twist, and point cloud) remain the same before and after the change using a pytest launch test.
The test script is shown below:

#!/usr/bin/env python3

import math
import unittest
import struct
import time
from typing import List
from typing import Tuple
import launch
import launch_ros.actions
import numpy as np

import launch_testing
import launch_testing.actions

import pytest

from rclpy.node import Node

from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy

import rclpy
from rclpy.time import Time
import rclpy.executors
from std_msgs.msg import Int32

from geometry_msgs.msg import TransformStamped
from geometry_msgs.msg import TwistWithCovarianceStamped
from sensor_msgs.msg import PointCloud2
from sensor_msgs.msg import PointField
from sensor_msgs.msg import Imu
from sensor_msgs_py import point_cloud2
from std_msgs.msg import Header


def create_header(timestamp: Time) -> Header:
    header = Header()
    header.stamp = timestamp.to_msg()
    header.frame_id = "base_link"
    return header


def generate_twist_msg(
    timestamp: Time, linear: Tuple[float, float, float], angular: Tuple[float, float, float]
) -> TwistWithCovarianceStamped:
    twist_msg = TwistWithCovarianceStamped()
    twist_header = create_header(timestamp)
    twist_msg.header = twist_header
    twist_msg.twist.twist.linear.x = linear[0]
    twist_msg.twist.twist.linear.y = linear[1]
    twist_msg.twist.twist.linear.z = linear[2]
    twist_msg.twist.twist.angular.x = angular[0]
    twist_msg.twist.twist.angular.y = angular[1]
    twist_msg.twist.twist.angular.z = angular[2]

    return twist_msg


def generate_imu_msg(
    timestamp: Time, angular_velocity: Tuple[float, float, float]
) -> Imu:
    imu_msg = Imu()
    imu_msg.header = create_header(timestamp)
    imu_msg.angular_velocity.x = angular_velocity[0]
    imu_msg.angular_velocity.y = angular_velocity[1]
    imu_msg.angular_velocity.z = angular_velocity[2]

    return imu_msg


def generate_pointcloud_msg(
    timestamp: Time, time_inclement_ns: float, points: List[Tuple[float, float, float]], azimuths: List[float]
) -> PointCloud2:
    data_length = len(points)
    if len(azimuths) != data_length:
        raise ValueError("The length of points and azimuths should be the same.")

    header = create_header(timestamp)
    intensities = [255] * data_length
    return_types = [1] * data_length
    channels = [1] * data_length
    elevations = [0.0] * data_length
    distances = [1.0] * data_length
    timestamps = [int(float(i) * time_inclement_ns) for i in range(data_length)]

    pointcloud_data = bytearray()

    for i in range(data_length):
        pointcloud_data += struct.pack("fff", points[i][0], points[i][1], points[i][2])
        pointcloud_data += struct.pack("B", intensities[i])
        pointcloud_data += struct.pack("B", return_types[i])
        pointcloud_data += struct.pack("H", channels[i])
        pointcloud_data += struct.pack("f", azimuths[i])
        pointcloud_data += struct.pack("f", elevations[i])
        pointcloud_data += struct.pack("f", distances[i])
        pointcloud_data += struct.pack("I", timestamps[i])

    fields = [
        PointField(name="x", offset=0, datatype=PointField.FLOAT32, count=1),
        PointField(name="y", offset=4, datatype=PointField.FLOAT32, count=1),
        PointField(name="z", offset=8, datatype=PointField.FLOAT32, count=1),
        PointField(name="intensity", offset=12, datatype=PointField.UINT8, count=1),
        PointField(name="return_type", offset=13, datatype=PointField.UINT8, count=1),
        PointField(name="channel", offset=14, datatype=PointField.UINT16, count=1),
        PointField(name="azimuth", offset=16, datatype=PointField.FLOAT32, count=1),
        PointField(name="elevation", offset=20, datatype=PointField.FLOAT32, count=1),
        PointField(name="distance", offset=24, datatype=PointField.FLOAT32, count=1),
        PointField(name="time_stamp", offset=28, datatype=PointField.UINT32, count=1),
    ]

    pointcloud_msg = PointCloud2(
        header=header,
        height=1,
        width=data_length,
        is_dense=True,
        is_bigendian=False,
        point_step=32,  # 3*4 + 1 + 1 + 2 + 4 + 4 + 4 + 4 = 32 bytes per point
        row_step=32 * data_length,
        fields=fields,
        data=pointcloud_data,
    )

    return pointcloud_msg


def get_output_points(cloud_msg) -> np.ndarray:
    points_list = []
    for point in point_cloud2.read_points(cloud_msg, field_names=("x", "y", "z")):
        points_list.append([point[0], point[1], point[2]])
    points = np.array(points_list, dtype=np.float32)
    return points


@pytest.mark.launch_test
def generate_test_description():
    return launch.LaunchDescription([
        launch_ros.actions.Node(
            package='autoware_pointcloud_preprocessor',
            executable='distortion_corrector_node',
            name='distortion_corrector_node',
            output='screen',
            parameters=[{
                'use_sim_time': True,
                'base_frame': "base_link",
                'use_imu': True,
                'use_3d_distortion_correction': False,
                'update_azimuth_and_distance': False,
                'has_static_tf_only': True
            }]
        ),
        launch_testing.actions.ReadyToTest()
    ])


def to_timestamp(time_sec: float) -> Time:
    return Time(seconds=int(time_sec), nanoseconds=int((time_sec - int(time_sec)) * 1e9))


class TestDistortionCorrectorNode(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        rclpy.init()

    @classmethod
    def tearDownClass(cls):
        rclpy.shutdown()

    def setUp(self):
        self.node = rclpy.create_node("test_distortion_corrector_node")
        self.msg_buffer = []
        self.create_pub_sub()
        self.wait_for_ready()
    
    def wait_for_ready(self):
        time.sleep(1)

    def tearDown(self):
        self.node.destroy_node()

    def callback(self, msg: PointCloud2):
        self.msg_buffer.append(msg)

    def create_pub_sub(self):
        qos_high = QoSProfile(
            history=QoSHistoryPolicy.KEEP_ALL,
            reliability=QoSReliabilityPolicy.RELIABLE,
            durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
        )
        # Publishers
        self.twist_publisher = self.node.create_publisher(
            TwistWithCovarianceStamped,
            "/distortion_corrector_node/input/twist",
            10,
        )

        self.imu_publisher = self.node.create_publisher(
            Imu,
            "/distortion_corrector_node/input/imu",
            10,
        )

        self.pointcloud_publishers = self.node.create_publisher(
            PointCloud2,
            "/distortion_corrector_node/input/pointcloud",
            10
        )

        # create subscriber
        self.msg_buffer = []
        self.node.create_subscription(
            PointCloud2,
            "/distortion_corrector_node/output/pointcloud",
            self.callback,
            QoSProfile(
                history=QoSHistoryPolicy.KEEP_LAST,
                depth=10,
                reliability=QoSReliabilityPolicy.BEST_EFFORT,
                durability=QoSDurabilityPolicy.VOLATILE,
            ),
        )

    def test_return_same_pointcloud_to_static_inputs(self):
        input_pointcloud = np.array(
            [
                [1.0, 0.0, 0.0],
                [0.0, 1.0, 0.0],
                [0.0, 0.0, 1.0]
            ],
            dtype=np.float32,
        )
        timestamp = to_timestamp(10.1)

        twist_msg = generate_twist_msg(
            timestamp=timestamp,
            linear=(0.0, 0.0, 0.0),
            angular=(0.0, 0.0, 0.0),
        )
        self.twist_publisher.publish(twist_msg)

        imu_msg = generate_imu_msg(
            timestamp=timestamp,
            angular_velocity=(0.0, 0.0, 0.0),
        )
        self.imu_publisher.publish(imu_msg)

        time.sleep(0.5)

        pointcloud_msg = generate_pointcloud_msg(
            timestamp=timestamp,
            time_inclement_ns=0.0,
            points=input_pointcloud,
            azimuths=[0.0] * len(input_pointcloud)
        )
        self.pointcloud_publishers.publish(pointcloud_msg)
        time.sleep(0.1)

        rclpy.spin_once(self.node, timeout_sec=0.1)

        self.assertEqual(
            len(self.msg_buffer),
            1,
            "The number of pointcloud has different number as expected.",
        )

        expected_pointcloud = np.array(
            [
                [1.0, 0.0, 0.0],
                [0.0, 1.0, 0.0],
                [0.0, 0.0, 1.0]
            ],
            dtype=np.float32,
        )

        output_pointcloud = get_output_points(self.msg_buffer[0])

        print("expected_pointcloud: \n", expected_pointcloud)
        print("output_pointcloud: \n", output_pointcloud)

        self.assertTrue(
            np.allclose(output_pointcloud, expected_pointcloud, atol=1e-3),
            "The distortion corrector node have weird output",
        )

    def test_return_corrected_pointcloud_to_dynamic_input(self):
        input_pointcloud = np.array(
            [
                [0.0, 0.0, 0.0],  
                [0.0, 0.0, 0.0],  
                [10.0, 0.0, 1.0], 
                [5.0, -5.0, 2.0], 
                [0.0, -10.0, 3.0],
                [-5.0, -5.0, 4.0],
                [-10.0, 0.0, 5.0],
                [-5.0, 5.0, -5.0],
                [0.0, 10.0, -4.0],
                [5.0, 5.0, -3.0],
            ],
            dtype=np.float32,
        )
        azimuths = [math.atan2(p[1], p[0]) for p in input_pointcloud]
        pointcloud_timestamp = 10.10
        pointcloud_time_inclement_ns = 10000000
        twist_linear_x_list = [10.0, 12.0, 14.0, 16.0, 18.0, 20.0]
        twist_angular_z_list = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
        twist_timestamp_list = [10.095, 10.119, 10.143, 10.167, 10.191, 10.215]
        imu_angular_velocity_z_list = [0.05, 0.055, 0.06, 0.065, 0.07, 0.075]
        imu_timestamp_list = [10.09, 10.117, 10.144, 10.171, 10.198, 10.225]

        for l, a, t in zip(twist_linear_x_list, twist_angular_z_list, twist_timestamp_list):
            twist_msg = generate_twist_msg(
                timestamp=to_timestamp(t),
                linear=(l, 0.0, 0.0),
                angular=(0.0, 0.0, a),
            )
            time.sleep(0.01)
            self.twist_publisher.publish(twist_msg)
        
        for a, t in zip(imu_angular_velocity_z_list, imu_timestamp_list):
            imu_msg = generate_imu_msg(
                timestamp=to_timestamp(t),
                angular_velocity=(0.0, 0.0, a),
            )
            time.sleep(0.01)
            self.imu_publisher.publish(imu_msg)

        time.sleep(0.5)

        pointcloud_msg = generate_pointcloud_msg(
            timestamp=to_timestamp(pointcloud_timestamp),
            time_inclement_ns=pointcloud_time_inclement_ns,
            points=input_pointcloud,
            azimuths=azimuths
        )
        self.pointcloud_publishers.publish(pointcloud_msg)
        time.sleep(0.01)

        rclpy.spin_once(self.node, timeout_sec=0.1)

        self.assertEqual(
            len(self.msg_buffer),
            1,
            "The number of pointcloud has different number as expected.",
        )

        expected_pointcloud = np.array(
            [
                [ 0.0000000e+00,  0.0000000e+00,  0.0000000e+00],
                [ 1.1999998e-01,  6.3276704e-05,  0.0000000e+00],
                [ 1.0259994e+01,  1.1729198e-02,  1.0000000e+00],
                [ 5.4086208e+00, -4.9908915e+00,  2.0000000e+00],
                [ 5.6396770e-01, -9.9991693e+00,  3.0000000e+00],
                [-4.2851191e+00, -5.0135536e+00,  4.0000000e+00],
                [-9.1399364e+00, -3.4564346e-02,  5.0000000e+00],
                [-3.9815292e+00,  4.9810262e+00, -5.0000000e+00],
                [ 1.1701394e+00,  1.0003417e+01, -4.0000000e+00],
                [ 6.3716283e+00,  5.0327711e+00, -3.0000000e+00],
            ],
            dtype=np.float32,
        )

        output_pointcloud = get_output_points(self.msg_buffer[0])

        print("expected_pointcloud: \n", expected_pointcloud)
        print("output_pointcloud: \n", output_pointcloud)

        self.assertTrue(
            np.allclose(output_pointcloud, expected_pointcloud, atol=1e-3),
            "The distortion corrector node have weird output",
        )

Test Item 2: CPU Usage Improvement

Performance was measured using the perf command. Topics were published using a rosbag.

Measurement commands:

// Terminal 1
~/autoware$ ros2 run autoware_pointcloud_preprocessor distortion_corrector_node --ros-args --params-file src/universe/autoware.universe/sensing/autoware_pointcloud_preprocessor/config/distortion_corrector_node.param.yaml -r /distortion_corrector_node/input/imu:=/sensing/imu/tamagawa/imu_raw -r /distortion_corrector_node/input/pointcloud:=/localization/util/downsample/pointcloud -r  /distortion_corrector_node/input/twist:=/localization/twist_estimator/twist_with_covariance

// Terminal 2
~$ ros2 run tf2_ros static_transform_publisher --x 0 --y 0 --z 0 --qx 0 --qy 0 --qz 0 --qw 1 --frame-id base_link --child-frame-id top_front_left/imu_link

//  Terminal 3
~/autoware_map$ ros2 bag play rosbag_with_high_frequency_imu_and_twist --topics /sensing/imu/tamagawa/imu_raw /localization/util/downsample/pointcloud /localization/twist_estimator/twist_with_covariance

Before change:

~$ perf stat -p `pgrep distortion` sleep -- 10

 Performance counter stats for process id '102237':

            230.00 msec task-clock                       #    0.023 CPUs utilized             
             6,219      context-switches                 #   27.039 K/sec 

After change:

~$ perf stat -p `pgrep distortion` sleep -- 10

 Performance counter stats for process id '87440':

            101.21 msec task-clock                       #    0.010 CPUs utilized             
             3,222      context-switches                 #   31.835 K/sec                     

The test results show a significant reduction (approximately 300 times/sec) in context switches due to interrupts, which led to the expected decrease in CPU usage.

Test Item 3: Rosbag Replay Simulator Functionality

Confirmed that the rosbag replay simulator functions as expected.

// Terminal 1
source ./install/local_setup.bash
ros2 launch autoware_launch  logging_simulator.launch.xml map_path:=/data/rosbag_map/universe/sample-map-rosbag   vehicle_model:=sample_vehicle sensor_model:=sample_sensor_kit

// Termianl2
source ./install/local_setup.bash
ros2 bag play /data/rosbag_map/universe/sample-rosbag/

Notes for reviewers

The queue size for the IMU and twist topics is currently hardcoded for the reason stated in the comment.

Interface changes

None.

Effects on system behavior

  • CPU usage will be improved.

…olling subscriber

Changed to read data in bulk using take to reduce subscription callback overhead.
Especially effective when the frequency of imu or twist is high, such as 100Hz.

Signed-off-by: Takahisa.Ishikawa <[email protected]>
@interimadd interimadd added type:improvement Proposed enhancement component:sensing Data acquisition from sensors, drivers, preprocessing. (auto-assigned) labels Jan 31, 2025
Copy link

github-actions bot commented Jan 31, 2025

Thank you for contributing to the Autoware project!

🚧 If your pull request is in progress, switch it to draft mode.

Please ensure:

@interimadd interimadd changed the title fix(distortion_corrector_node): replace imu and twist callback with polling subscriber feat(distortion_corrector_node): replace imu and twist callback with polling subscriber Jan 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component:sensing Data acquisition from sensors, drivers, preprocessing. (auto-assigned) type:improvement Proposed enhancement
Projects
Status: To Triage
Development

Successfully merging this pull request may close these issues.

1 participant