Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(autoware_pointcloud_preprocessor): reuse collectors to reduce creation of collector and timer #10074

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ struct AdvancedCollectorInfo : public CollectorInfoBase
}
};

enum class CollectorStatus { Idle, Processing, Finished };

class CloudCollector
{
public:
Expand All @@ -67,11 +69,13 @@ class CloudCollector
std::unordered_map<std::string, sensor_msgs::msg::PointCloud2::SharedPtr>
get_topic_to_cloud_map();

[[nodiscard]] bool concatenate_finished() const;
[[nodiscard]] CollectorStatus get_status() const;

void set_info(std::shared_ptr<CollectorInfoBase> collector_info);
[[nodiscard]] std::shared_ptr<CollectorInfoBase> get_info() const;
void show_debug_message();
void reset();
void set_period(const int64_t new_period);

private:
std::shared_ptr<PointCloudConcatenateDataSynchronizerComponent> ros2_parent_node_;
Expand All @@ -81,9 +85,9 @@ class CloudCollector
uint64_t num_of_clouds_;
double timeout_sec_;
bool debug_mode_;
bool concatenate_finished_{false};
std::mutex concatenate_mutex_;
std::shared_ptr<CollectorInfoBase> collector_info_;
CollectorStatus status_;
};

} // namespace autoware::pointcloud_preprocessor
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class PointCloudConcatenateDataSynchronizerComponent : public rclcpp::Node
std::list<std::shared_ptr<CloudCollector>> cloud_collectors_;
std::unique_ptr<CollectorMatchingStrategy> collector_matching_strategy_;
std::mutex cloud_collectors_mutex_;
static constexpr const int collectors_threshold = 10;
vividf marked this conversation as resolved.
Show resolved Hide resolved

// default postfix name for synchronized pointcloud
static constexpr const char * default_sync_topic_postfix = "_synchronized";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@
timeout_sec_(timeout_sec),
debug_mode_(debug_mode)
{
status_ = CollectorStatus::Processing;
const auto period_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>(timeout_sec_));

timer_ =
rclcpp::create_timer(ros2_parent_node_, ros2_parent_node_->get_clock(), period_ns, [this]() {
std::lock_guard<std::mutex> concatenate_lock(concatenate_mutex_);
if (concatenate_finished_) return;
if (status_ == CollectorStatus::Finished) return;
concatenate_callback();
});
}
Expand All @@ -67,17 +68,33 @@
const std::string & topic_name, sensor_msgs::msg::PointCloud2::SharedPtr cloud)
{
std::lock_guard<std::mutex> concatenate_lock(concatenate_mutex_);
if (concatenate_finished_) return false;

// Check if the map already contains an entry for the same topic. This shouldn't happen if the
// parameter 'lidar_timestamp_noise_window' is set correctly.
if (topic_to_cloud_map_.find(topic_name) != topic_to_cloud_map_.end()) {
RCLCPP_WARN_STREAM_THROTTLE(
ros2_parent_node_->get_logger(), *ros2_parent_node_->get_clock(),
std::chrono::milliseconds(10000).count(),
"Topic '" << topic_name
<< "' already exists in the collector. Check the timestamp of the pointcloud.");
if (status_ == CollectorStatus::Finished) {
return false;
}
if (status_ == CollectorStatus::Idle) {
// Add first pointcloud to the collector, restart the timer
status_ = CollectorStatus::Processing;
const auto period_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>(timeout_sec_));
try {
set_period(period_ns.count());
} catch (rclcpp::exceptions::RCLError & ex) {
RCLCPP_WARN_THROTTLE(

Check warning on line 82 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp

View check run for this annotation

Codecov / codecov/patch

sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp#L82

Added line #L82 was not covered by tests
ros2_parent_node_->get_logger(), *ros2_parent_node_->get_clock(), 5000, "%s", ex.what());
}
timer_->reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? It seems to me reading the RCL source code that the period is unaffected by cancel() and so just calling reset() without re-setting the period should be fine, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, thanks for pointing out!
cbb94ec

} else if (status_ == CollectorStatus::Processing) {
// Check if the map already contains an entry for the same topic. This shouldn't happen if the
// parameter 'lidar_timestamp_noise_window' is set correctly.
if (topic_to_cloud_map_.find(topic_name) != topic_to_cloud_map_.end()) {
RCLCPP_WARN_STREAM_THROTTLE(

Check warning on line 90 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp

View check run for this annotation

Codecov / codecov/patch

sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp#L90

Added line #L90 was not covered by tests
ros2_parent_node_->get_logger(), *ros2_parent_node_->get_clock(),
std::chrono::milliseconds(10000).count(),
"Topic '" << topic_name
<< "' already exists in the collector. Check the timestamp of the pointcloud.");
}
}
vividf marked this conversation as resolved.
Show resolved Hide resolved

topic_to_cloud_map_[topic_name] = cloud;
if (topic_to_cloud_map_.size() == num_of_clouds_) {
concatenate_callback();
Expand All @@ -86,9 +103,25 @@
return true;
}

bool CloudCollector::concatenate_finished() const
void CloudCollector::set_period(const int64_t new_period)
{
if (!timer_) {
return;

Check warning on line 109 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp

View check run for this annotation

Codecov / codecov/patch

sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp#L109

Added line #L109 was not covered by tests
}
int64_t old_period = 0;
rcl_ret_t ret = rcl_timer_get_period(timer_->get_timer_handle().get(), &old_period);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't get old period");

Check warning on line 114 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp

View check run for this annotation

Codecov / codecov/patch

sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp#L114

Added line #L114 was not covered by tests
}
ret = rcl_timer_exchange_period(timer_->get_timer_handle().get(), new_period, &old_period);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't exchange_period");

Check warning on line 118 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp

View check run for this annotation

Codecov / codecov/patch

sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp#L118

Added line #L118 was not covered by tests
}
}

CollectorStatus CloudCollector::get_status() const
{
return concatenate_finished_;
return status_;
}

void CloudCollector::concatenate_callback()
Expand All @@ -105,7 +138,7 @@

ros2_parent_node_->publish_clouds(std::move(concatenated_cloud_result), collector_info_);

concatenate_finished_ = true;
status_ = CollectorStatus::Finished;
}

ConcatenatedCloudResult CloudCollector::concatenate_pointclouds(
Expand Down Expand Up @@ -153,4 +186,17 @@
RCLCPP_INFO(ros2_parent_node_->get_logger(), "%s", log_stream.str().c_str());
}

void CloudCollector::reset()
{
std::lock_guard<std::mutex> lock(concatenate_mutex_);

status_ = CollectorStatus::Idle; // Reset status to Idle
topic_to_cloud_map_.clear();
collector_info_ = nullptr;

if (timer_ && !timer_->is_canceled()) {
timer_->cancel();
}
}

} // namespace autoware::pointcloud_preprocessor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed with @drwnz just now, it's probably a goof idea to allocate (on startup) collectors_threshold collectors and then not add/remove any during operation.
This makes it easier to reason about the program state.
(would also get rid of L251-L260)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!!!
Fixed in 85f75e6

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 TIER IV, Inc.

Check notice on line 1 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp

View check run for this annotation

CodeScene Delta Analysis / CodeScene Cloud Delta Analysis (main)

ℹ Getting worse: Overall Code Complexity

The mean cyclomatic complexity increases from 5.73 to 6.09, threshold = 4. This file has many conditional statements (e.g. if, for, while) across its implementation, leading to lower code health. Avoid adding more conditionals.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -230,20 +230,36 @@
auto collector = cloud_collector.value();
if (collector) {
cloud_collectors_lock.unlock();
process_success = cloud_collector.value()->process_pointcloud(topic_name, input_ptr);
process_success = collector->process_pointcloud(topic_name, input_ptr);
}
}

// Didn't find matched collector, create a new collector
if (!process_success) {
auto new_cloud_collector = std::make_shared<CloudCollector>(
std::dynamic_pointer_cast<PointCloudConcatenateDataSynchronizerComponent>(shared_from_this()),
combine_cloud_handler_, params_.input_topics.size(), params_.timeout_sec, params_.debug_mode);
// Reuse the collector if the status is IDLE
std::shared_ptr<CloudCollector> selected_collector = nullptr;

cloud_collectors_.push_back(new_cloud_collector);
cloud_collectors_lock.unlock();
for (auto & collector : cloud_collectors_) {
if (collector->get_status() == CollectorStatus::Idle) {
selected_collector = collector;
break;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be expressed as a std::find_if instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! fixed in c99f1fe


if (!selected_collector) {
// If no idle collector exists, create a new collector
selected_collector = std::make_shared<CloudCollector>(
std::dynamic_pointer_cast<PointCloudConcatenateDataSynchronizerComponent>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dynamic creation of a new collector may not be ideal. How about we have a fixed number of collectors that are initialized, and then use the first idle one - if none are idle, kick the oldest one and reuse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!!!
Fixed in 85f75e6

shared_from_this()),
combine_cloud_handler_, params_.input_topics.size(), params_.timeout_sec,
params_.debug_mode);

cloud_collectors_.push_back(selected_collector);
}

collector_matching_strategy_->set_collector_info(new_cloud_collector, matching_params);
(void)new_cloud_collector->process_pointcloud(topic_name, input_ptr);
cloud_collectors_lock.unlock();
collector_matching_strategy_->set_collector_info(selected_collector, matching_params);
(void)selected_collector->process_pointcloud(topic_name, input_ptr);

Check warning on line 262 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp

View check run for this annotation

CodeScene Delta Analysis / CodeScene Cloud Delta Analysis (main)

❌ Getting worse: Complex Method

PointCloudConcatenateDataSynchronizerComponent::cloud_callback increases in cyclomatic complexity from 9 to 12, threshold = 9. This function has many conditional statements (e.g. if, for, while), leading to lower code health. Avoid adding more conditionals and code to it without refactoring.

Check notice on line 262 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp

View check run for this annotation

CodeScene Delta Analysis / CodeScene Cloud Delta Analysis (main)

ℹ Getting worse: Bumpy Road Ahead

PointCloudConcatenateDataSynchronizerComponent::cloud_callback increases from 2 to 3 logical blocks with deeply nested code, threshold is one single block per function. The Bumpy Road code smell is a function that contains multiple chunks of nested conditional logic. The deeper the nesting and the more bumps, the lower the code health.
}
}

Expand Down Expand Up @@ -321,8 +337,7 @@
}
}

diagnostic_collector_info_ = collector_info;

diagnostic_collector_info_ = std::move(collector_info);
diagnostic_topic_to_original_stamp_map_ = concatenated_cloud_result.topic_to_original_stamp_map;
diagnostic_updater_.force_update();

Expand All @@ -347,13 +362,19 @@
{
std::lock_guard<std::mutex> cloud_collectors_lock(cloud_collectors_mutex_);

for (auto it = cloud_collectors_.begin(); it != cloud_collectors_.end();) {
if ((*it)->concatenate_finished()) {
it = cloud_collectors_.erase(it); // Erase and move the iterator to the next element
} else {
++it; // Move to the next element
for (auto & collector : cloud_collectors_) {
if (collector->get_status() == CollectorStatus::Finished) {
collector->reset();
}
}

if (cloud_collectors_.size() > collectors_threshold) {
RCLCPP_WARN_STREAM_THROTTLE(

Check warning on line 372 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp

View check run for this annotation

Codecov / codecov/patch

sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp#L372

Added line #L372 was not covered by tests
get_logger(), *get_clock(), 1000,
"The number of cloud collectors (" << cloud_collectors_.size() << ") exceeds the threshold ("
<< collectors_threshold
<< "), be careful if it keeps increasing.");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like this to be a hard limit. In production, nobody will look at those logs, so the memory footprint could increase forever.

Copy link
Contributor Author

@vividf vividf Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!
Fixed with resetting collector that has the oldest timestamp:
81ec941 and 3375c43

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix logging: a8444cb


std::string PointCloudConcatenateDataSynchronizerComponent::format_timestamp(double timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,18 +485,21 @@ TEST_F(ConcatenateCloudTest, TestProcessSingleCloud)

auto topic_to_cloud_map = collector_->get_topic_to_cloud_map();
EXPECT_EQ(topic_to_cloud_map["lidar_top"], top_pointcloud_ptr);
EXPECT_FALSE(collector_->concatenate_finished());
EXPECT_EQ(
collector_->get_status(), autoware::pointcloud_preprocessor::CollectorStatus::Processing);
concatenate_node_->manage_collector_list();
EXPECT_FALSE(concatenate_node_->get_cloud_collectors().empty());
EXPECT_EQ(concatenate_node_->get_cloud_collectors().size(), 1);

// Sleep for timeout seconds (200 ms)
std::this_thread::sleep_for(std::chrono::milliseconds(200));
rclcpp::spin_some(concatenate_node_);

// Collector should concatenate and publish the pointcloud, also delete itself.
EXPECT_TRUE(collector_->concatenate_finished());
// Collector should concatenate and publish the pointcloud, and set the status to IDLE
EXPECT_EQ(collector_->get_status(), autoware::pointcloud_preprocessor::CollectorStatus::Finished);
concatenate_node_->manage_collector_list();
EXPECT_TRUE(concatenate_node_->get_cloud_collectors().empty());
EXPECT_EQ(
concatenate_node_->get_cloud_collectors().front()->get_status(),
autoware::pointcloud_preprocessor::CollectorStatus::Idle);
}

TEST_F(ConcatenateCloudTest, TestProcessMultipleCloud)
Expand Down Expand Up @@ -524,9 +527,11 @@ TEST_F(ConcatenateCloudTest, TestProcessMultipleCloud)
collector_->process_pointcloud("lidar_left", left_pointcloud_ptr);
collector_->process_pointcloud("lidar_right", right_pointcloud_ptr);

EXPECT_TRUE(collector_->concatenate_finished());
EXPECT_EQ(collector_->get_status(), autoware::pointcloud_preprocessor::CollectorStatus::Finished);
concatenate_node_->manage_collector_list();
EXPECT_TRUE(concatenate_node_->get_cloud_collectors().empty());
EXPECT_EQ(
concatenate_node_->get_cloud_collectors().front()->get_status(),
autoware::pointcloud_preprocessor::CollectorStatus::Idle);
}

int main(int argc, char ** argv)
Expand Down
Loading