Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(autoware_pointcloud_preprocessor): reuse collectors to reduce creation of collector and timer #10074

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ struct AdvancedCollectorInfo : public CollectorInfoBase
}
};

enum class CollectorStatus { Idle, Processing, Finished };

class CloudCollector
{
public:
Expand All @@ -67,11 +69,12 @@ class CloudCollector
std::unordered_map<std::string, sensor_msgs::msg::PointCloud2::SharedPtr>
get_topic_to_cloud_map();

[[nodiscard]] bool concatenate_finished() const;
[[nodiscard]] CollectorStatus get_status();

void set_info(std::shared_ptr<CollectorInfoBase> collector_info);
[[nodiscard]] std::shared_ptr<CollectorInfoBase> get_info() const;
void show_debug_message();
void reset();

private:
std::shared_ptr<PointCloudConcatenateDataSynchronizerComponent> ros2_parent_node_;
Expand All @@ -81,9 +84,9 @@ class CloudCollector
uint64_t num_of_clouds_;
double timeout_sec_;
bool debug_mode_;
bool concatenate_finished_{false};
std::mutex concatenate_mutex_;
std::shared_ptr<CollectorInfoBase> collector_info_;
CollectorStatus status_;
};

} // namespace autoware::pointcloud_preprocessor
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class PointCloudConcatenateDataSynchronizerComponent : public rclcpp::Node
std::list<std::shared_ptr<CloudCollector>> cloud_collectors_;
std::unique_ptr<CollectorMatchingStrategy> collector_matching_strategy_;
std::mutex cloud_collectors_mutex_;
bool init_collector_list_ = false;
static constexpr const int num_of_collectors = 3;

// default postfix name for synchronized pointcloud
static constexpr const char * default_sync_topic_postfix = "_synchronized";
Expand Down Expand Up @@ -119,6 +121,7 @@ class PointCloudConcatenateDataSynchronizerComponent : public rclcpp::Node
void check_concat_status(diagnostic_updater::DiagnosticStatusWrapper & stat);
std::string replace_sync_topic_name_postfix(
const std::string & original_topic_name, const std::string & postfix);
void initialize_collector_list();
};

} // namespace autoware::pointcloud_preprocessor
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,19 @@
timeout_sec_(timeout_sec),
debug_mode_(debug_mode)
{
status_ = CollectorStatus::Idle;

const auto period_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>(timeout_sec_));

timer_ =
rclcpp::create_timer(ros2_parent_node_, ros2_parent_node_->get_clock(), period_ns, [this]() {
std::lock_guard<std::mutex> concatenate_lock(concatenate_mutex_);
if (concatenate_finished_) return;
if (status_ == CollectorStatus::Finished) return;
concatenate_callback();
});

timer_->cancel();
}

void CloudCollector::set_info(std::shared_ptr<CollectorInfoBase> collector_info)
Expand All @@ -67,17 +71,26 @@
const std::string & topic_name, sensor_msgs::msg::PointCloud2::SharedPtr cloud)
{
std::lock_guard<std::mutex> concatenate_lock(concatenate_mutex_);
if (concatenate_finished_) return false;

// Check if the map already contains an entry for the same topic. This shouldn't happen if the
// parameter 'lidar_timestamp_noise_window' is set correctly.
if (topic_to_cloud_map_.find(topic_name) != topic_to_cloud_map_.end()) {
RCLCPP_WARN_STREAM_THROTTLE(
ros2_parent_node_->get_logger(), *ros2_parent_node_->get_clock(),
std::chrono::milliseconds(10000).count(),
"Topic '" << topic_name
<< "' already exists in the collector. Check the timestamp of the pointcloud.");
if (status_ == CollectorStatus::Finished) {
return false;
}

if (status_ == CollectorStatus::Idle) {
// Add first pointcloud to the collector, restart the timer
status_ = CollectorStatus::Processing;
timer_->reset();
} else if (status_ == CollectorStatus::Processing) {
// Check if the map already contains an entry for the same topic. This shouldn't happen if the
// parameter 'lidar_timestamp_noise_window' is set correctly.
if (topic_to_cloud_map_.find(topic_name) != topic_to_cloud_map_.end()) {
RCLCPP_WARN_STREAM_THROTTLE(

Check warning on line 86 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp

View check run for this annotation

Codecov / codecov/patch

sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp#L86

Added line #L86 was not covered by tests
ros2_parent_node_->get_logger(), *ros2_parent_node_->get_clock(),
std::chrono::milliseconds(10000).count(),
"Topic '" << topic_name
<< "' already exists in the collector. Check the timestamp of the pointcloud.");
}
}

topic_to_cloud_map_[topic_name] = cloud;
if (topic_to_cloud_map_.size() == num_of_clouds_) {
concatenate_callback();
Expand All @@ -86,9 +99,10 @@
return true;
}

bool CloudCollector::concatenate_finished() const
CollectorStatus CloudCollector::get_status()
{
return concatenate_finished_;
std::lock_guard<std::mutex> concatenate_lock(concatenate_mutex_);
return status_;
}

void CloudCollector::concatenate_callback()
Expand All @@ -102,10 +116,9 @@
timer_->cancel();

auto concatenated_cloud_result = concatenate_pointclouds(topic_to_cloud_map_);

ros2_parent_node_->publish_clouds(std::move(concatenated_cloud_result), collector_info_);

concatenate_finished_ = true;
status_ = CollectorStatus::Finished;
}

ConcatenatedCloudResult CloudCollector::concatenate_pointclouds(
Expand Down Expand Up @@ -153,4 +166,17 @@
RCLCPP_INFO(ros2_parent_node_->get_logger(), "%s", log_stream.str().c_str());
}

void CloudCollector::reset()
{
std::lock_guard<std::mutex> lock(concatenate_mutex_);

status_ = CollectorStatus::Idle; // Reset status to Idle
topic_to_cloud_map_.clear();
collector_info_ = nullptr;

if (timer_ && !timer_->is_canceled()) {
timer_->cancel();
}
}

} // namespace autoware::pointcloud_preprocessor
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,10 @@ ConcatenatedCloudResult CombineCloudHandler::combine_pointclouds(
{
ConcatenatedCloudResult concatenate_cloud_result;

if (topic_to_cloud_map.empty()) return concatenate_cloud_result;

std::vector<rclcpp::Time> pc_stamps;
pc_stamps.reserve(topic_to_cloud_map.size());
for (const auto & [topic, cloud] : topic_to_cloud_map) {
pc_stamps.emplace_back(cloud->header.stamp);
}
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed with @drwnz just now, it's probably a goof idea to allocate (on startup) collectors_threshold collectors and then not add/remove any during operation.
This makes it easier to reason about the program state.
(would also get rid of L251-L260)

Copy link
Contributor Author

@vividf vividf Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!!!
Fixed in 85f75e6 and 0bed7b3

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <pcl_conversions/pcl_conversions.h>

#include <iomanip>
#include <limits>
#include <list>
#include <memory>
#include <optional>
Expand Down Expand Up @@ -178,72 +179,98 @@
return replaced_topic_name;
}

void PointCloudConcatenateDataSynchronizerComponent::initialize_collector_list()
{
// Initialize collector list
for (size_t i = 0; i < num_of_collectors; ++i) {
cloud_collectors_.emplace_back(std::make_shared<CloudCollector>(
std::dynamic_pointer_cast<PointCloudConcatenateDataSynchronizerComponent>(shared_from_this()),
combine_cloud_handler_, params_.input_topics.size(), params_.timeout_sec,
params_.debug_mode));
}
init_collector_list_ = true;
}

void PointCloudConcatenateDataSynchronizerComponent::cloud_callback(
const sensor_msgs::msg::PointCloud2::SharedPtr & input_ptr, const std::string & topic_name)
{
stop_watch_ptr_->toc("processing_time", true);
if (!init_collector_list_) {
initialize_collector_list();
}

double cloud_arrival_time = this->get_clock()->now().seconds();
manage_collector_list();

if (!utils::is_data_layout_compatible_with_point_xyzirc(*input_ptr)) {
RCLCPP_ERROR(
get_logger(), "The pointcloud layout is not compatible with PointXYZIRC. Aborting");

if (utils::is_data_layout_compatible_with_point_xyzi(*input_ptr)) {
RCLCPP_ERROR(
get_logger(),
"The pointcloud layout is compatible with PointXYZI. You may be using legacy code/data");
}

return;
}

if (params_.debug_mode) {
RCLCPP_INFO(
this->get_logger(), " pointcloud %s timestamp: %lf arrive time: %lf seconds, latency: %lf",
topic_name.c_str(), rclcpp::Time(input_ptr->header.stamp).seconds(), cloud_arrival_time,
cloud_arrival_time - rclcpp::Time(input_ptr->header.stamp).seconds());
}

if (input_ptr->data.empty()) {
RCLCPP_WARN_STREAM_THROTTLE(
this->get_logger(), *this->get_clock(), 1000, "Empty sensor points!");
}

// protect cloud collectors list
std::unique_lock<std::mutex> cloud_collectors_lock(cloud_collectors_mutex_);

// For each callback, check whether there is a exist collector that matches this cloud
std::optional<std::shared_ptr<CloudCollector>> cloud_collector = std::nullopt;
MatchingParams matching_params;
matching_params.topic_name = topic_name;
matching_params.cloud_arrival_time = cloud_arrival_time;
matching_params.cloud_timestamp = rclcpp::Time(input_ptr->header.stamp).seconds();

if (!cloud_collectors_.empty()) {
cloud_collector =
collector_matching_strategy_->match_cloud_to_collector(cloud_collectors_, matching_params);
}

bool process_success = false;
if (cloud_collector.has_value()) {
auto collector = cloud_collector.value();
if (collector) {
cloud_collectors_lock.unlock();
process_success = cloud_collector.value()->process_pointcloud(topic_name, input_ptr);
process_success = collector->process_pointcloud(topic_name, input_ptr);
}
}

// Didn't find matched collector
if (!process_success) {
auto new_cloud_collector = std::make_shared<CloudCollector>(
std::dynamic_pointer_cast<PointCloudConcatenateDataSynchronizerComponent>(shared_from_this()),
combine_cloud_handler_, params_.input_topics.size(), params_.timeout_sec, params_.debug_mode);
// Reuse the collector if the status is IDLE
std::shared_ptr<CloudCollector> selected_collector = nullptr;

cloud_collectors_.push_back(new_cloud_collector);
cloud_collectors_lock.unlock();
auto it = std::find_if(
cloud_collectors_.begin(), cloud_collectors_.end(),
[](const auto & collector) { return collector->get_status() == CollectorStatus::Idle; });

collector_matching_strategy_->set_collector_info(new_cloud_collector, matching_params);
(void)new_cloud_collector->process_pointcloud(topic_name, input_ptr);
if (it != cloud_collectors_.end()) {
selected_collector = *it;
}
cloud_collectors_lock.unlock();
if (selected_collector) {
collector_matching_strategy_->set_collector_info(selected_collector, matching_params);
(void)selected_collector->process_pointcloud(topic_name, input_ptr);
} else {
// Handle case where no suitable collector is found
RCLCPP_WARN(get_logger(), "No available CloudCollector in IDLE state.");
}

Check warning on line 273 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp

View check run for this annotation

CodeScene Delta Analysis / CodeScene Cloud Delta Analysis (main)

❌ Getting worse: Complex Method

PointCloudConcatenateDataSynchronizerComponent::cloud_callback increases in cyclomatic complexity from 9 to 12, threshold = 9. This function has many conditional statements (e.g. if, for, while), leading to lower code health. Avoid adding more conditionals and code to it without refactoring.

Check warning on line 273 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp

View check run for this annotation

CodeScene Delta Analysis / CodeScene Cloud Delta Analysis (main)

❌ Getting worse: Bumpy Road Ahead

PointCloudConcatenateDataSynchronizerComponent::cloud_callback increases from 2 to 3 logical blocks with deeply nested code, threshold is one single block per function. The Bumpy Road code smell is a function that contains multiple chunks of nested conditional logic. The deeper the nesting and the more bumps, the lower the code health.
}
}

Expand Down Expand Up @@ -321,8 +348,7 @@
}
}

diagnostic_collector_info_ = collector_info;

diagnostic_collector_info_ = std::move(collector_info);
diagnostic_topic_to_original_stamp_map_ = concatenated_cloud_result.topic_to_original_stamp_map;
diagnostic_updater_.force_update();

Expand All @@ -347,11 +373,50 @@
{
std::lock_guard<std::mutex> cloud_collectors_lock(cloud_collectors_mutex_);

for (auto it = cloud_collectors_.begin(); it != cloud_collectors_.end();) {
if ((*it)->concatenate_finished()) {
it = cloud_collectors_.erase(it); // Erase and move the iterator to the next element
} else {
++it; // Move to the next element
int num_processing_collectors = 0;

for (auto & collector : cloud_collectors_) {
if (collector->get_status() == CollectorStatus::Finished) {
collector->reset();
}

if (collector->get_status() == CollectorStatus::Processing) {
num_processing_collectors++;
}
}

if (num_processing_collectors == num_of_collectors) {
auto min_it = cloud_collectors_.end();
constexpr double k_max_timestamp = std::numeric_limits<double>::max();
double min_timestamp = k_max_timestamp;

for (auto it = cloud_collectors_.begin(); it != cloud_collectors_.end(); ++it) {
if ((*it)->get_status() == CollectorStatus::Processing) {
auto info = (*it)->get_info();
double timestamp = k_max_timestamp;

if (auto naive_info = std::dynamic_pointer_cast<NaiveCollectorInfo>(info)) {
timestamp = naive_info->timestamp;

Check warning on line 399 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp

View check run for this annotation

Codecov / codecov/patch

sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp#L399

Added line #L399 was not covered by tests
} else if (auto advanced_info = std::dynamic_pointer_cast<AdvancedCollectorInfo>(info)) {
timestamp = advanced_info->timestamp;
} else {
continue;
}

if (timestamp < min_timestamp) {
min_timestamp = timestamp;
min_it = it;
}
}
}

// Reset the collector with the oldest timestamp if found
if (min_it != cloud_collectors_.end()) {
RCLCPP_WARN_STREAM_THROTTLE(

Check warning on line 415 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp

View check run for this annotation

Codecov / codecov/patch

sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp#L415

Added line #L415 was not covered by tests
this->get_logger(), *this->get_clock(), 1000,
"Reset the oldest collector because the number of processing collectors ("
<< num_processing_collectors << ") equal to the limit (" << num_of_collectors << ").");
(*min_it)->reset();

Check warning on line 419 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp

View check run for this annotation

CodeScene Delta Analysis / CodeScene Cloud Delta Analysis (main)

❌ New issue: Complex Method

PointCloudConcatenateDataSynchronizerComponent::manage_collector_list has a cyclomatic complexity of 12, threshold = 9. This function has many conditional statements (e.g. if, for, while), leading to lower code health. Avoid adding more conditionals and code to it without refactoring.

Check warning on line 419 in sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp

View check run for this annotation

CodeScene Delta Analysis / CodeScene Cloud Delta Analysis (main)

❌ New issue: Bumpy Road Ahead

PointCloudConcatenateDataSynchronizerComponent::manage_collector_list has 2 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is one single, nested block per function. The Bumpy Road code smell is a function that contains multiple chunks of nested conditional logic. The deeper the nesting and the more bumps, the lower the code health.
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,18 +485,21 @@ TEST_F(ConcatenateCloudTest, TestProcessSingleCloud)

auto topic_to_cloud_map = collector_->get_topic_to_cloud_map();
EXPECT_EQ(topic_to_cloud_map["lidar_top"], top_pointcloud_ptr);
EXPECT_FALSE(collector_->concatenate_finished());
EXPECT_EQ(
collector_->get_status(), autoware::pointcloud_preprocessor::CollectorStatus::Processing);
concatenate_node_->manage_collector_list();
EXPECT_FALSE(concatenate_node_->get_cloud_collectors().empty());
EXPECT_EQ(concatenate_node_->get_cloud_collectors().size(), 1);

// Sleep for timeout seconds (200 ms)
std::this_thread::sleep_for(std::chrono::milliseconds(200));
rclcpp::spin_some(concatenate_node_);

// Collector should concatenate and publish the pointcloud, also delete itself.
EXPECT_TRUE(collector_->concatenate_finished());
// Collector should concatenate and publish the pointcloud, and set the status to IDLE
EXPECT_EQ(collector_->get_status(), autoware::pointcloud_preprocessor::CollectorStatus::Finished);
concatenate_node_->manage_collector_list();
EXPECT_TRUE(concatenate_node_->get_cloud_collectors().empty());
EXPECT_EQ(
concatenate_node_->get_cloud_collectors().front()->get_status(),
autoware::pointcloud_preprocessor::CollectorStatus::Idle);
}

TEST_F(ConcatenateCloudTest, TestProcessMultipleCloud)
Expand Down Expand Up @@ -524,9 +527,11 @@ TEST_F(ConcatenateCloudTest, TestProcessMultipleCloud)
collector_->process_pointcloud("lidar_left", left_pointcloud_ptr);
collector_->process_pointcloud("lidar_right", right_pointcloud_ptr);

EXPECT_TRUE(collector_->concatenate_finished());
EXPECT_EQ(collector_->get_status(), autoware::pointcloud_preprocessor::CollectorStatus::Finished);
concatenate_node_->manage_collector_list();
EXPECT_TRUE(concatenate_node_->get_cloud_collectors().empty());
EXPECT_EQ(
concatenate_node_->get_cloud_collectors().front()->get_status(),
autoware::pointcloud_preprocessor::CollectorStatus::Idle);
}

int main(int argc, char ** argv)
Expand Down
Loading