Skip to content

Commit

Permalink
Use async/await for getting parameter names
Browse files Browse the repository at this point in the history
  • Loading branch information
bjsowa committed Jan 13, 2025
1 parent a000226 commit 65eb744
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 33 deletions.
7 changes: 4 additions & 3 deletions rosapi/scripts/rosapi_node
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class Rosapi(Node):
full_name = self.get_namespace() + self.get_name()
else:
full_name = self.get_namespace() + "/" + self.get_name()
params.init(full_name)
# params.init(full_name)
params._node = self
self.create_service(Topics, "/rosapi/topics", self.get_topics)
self.create_service(Interfaces, "/rosapi/interfaces", self.get_interfaces)
self.create_service(TopicsForType, "/rosapi/topics_for_type", self.get_topics_for_type)
Expand Down Expand Up @@ -284,8 +285,8 @@ class Rosapi(Node):
params.delete_param(request.node_name, request.name, self.globs.params)
return response

def get_param_names(self, request, response):
response.names = params.get_param_names(self.globs.params)
async def get_param_names(self, request, response):
response.names = await params.get_param_names(self.globs.params)
return response

def get_time(self, request, response):
Expand Down
61 changes: 31 additions & 30 deletions rosapi/src/rosapi/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from json import dumps, loads

import rclpy
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rcl_interfaces.msg import Parameter, ParameterType, ParameterValue
from rcl_interfaces.srv import ListParameters
from rclpy.parameter import get_parameter_value
Expand All @@ -48,7 +49,6 @@
# Ensure thread safety for setting / getting parameters.
param_server_lock = threading.RLock()
_node = None
_parent_node_name = ""

_parameter_type_mapping = [
"",
Expand All @@ -64,22 +64,22 @@
]


def init(parent_node_name):
"""
Initializes params module with a rclpy.node.Node for further use.
This function has to be called before any other for the module to work.
"""
global _node, _parent_node_name
# TODO(@jubeira): remove this node; use rosapi node with MultiThreadedExecutor or
# async / await to prevent the service calls from blocking.
parent_node_basename = parent_node_name.split("/")[-1]
param_node_name = f"{parent_node_basename}_params"
_node = rclpy.create_node(
param_node_name,
cli_args=["--ros-args", "-r", f"__node:={param_node_name}"],
start_parameter_services=False,
)
_parent_node_name = get_absolute_node_name(parent_node_name)
async def future_with_timeout(node: rclpy.node.Node, future: rclpy.task.Future, timeout_sec: float):
first_done_future = rclpy.task.Future()

def handler(arg=None):
first_done_future.set_result(None)

timer = node.create_timer(timeout_sec, handler)
future.add_done_callback(handler)

await first_done_future

if not future.done():
raise TimeoutError(f"Future timed out after {timeout_sec} seconds")

timer.cancel()
timer.destroy()


def set_param(node_name, name, value, params_glob):
Expand Down Expand Up @@ -194,17 +194,17 @@ def delete_param(node_name, name, params_glob):
_set_param(node_name, name, None, ParameterType.PARAMETER_NOT_SET)


def get_param_names(params_glob):
async def get_param_names(params_glob):
params = []
nodes = get_nodes()

for node in nodes:
params.extend(get_node_param_names(node, params_glob))
params.extend(await get_node_param_names(node, params_glob))

return params


def get_node_param_names(node_name, params_glob):
async def get_node_param_names(node_name, params_glob):
"""Gets list of parameter names for a given node"""
node_name = get_absolute_node_name(node_name)

Expand All @@ -214,33 +214,34 @@ def get_node_param_names(node_name, params_glob):
return list(
filter(
lambda x: any(fnmatch.fnmatch(str(x), glob) for glob in params_glob),
_get_param_names(node_name),
await _get_param_names(node_name),
)
)
else:
# If there is no parameter glob, don't filter.
return _get_param_names(node_name)
return await _get_param_names(node_name)


def _get_param_names(node_name):
async def _get_param_names(node_name):
# This method is called in a service callback; calling a service of the same node
# will cause a deadlock.
global _parent_node_name
if node_name == _parent_node_name:
if node_name == _node.get_fully_qualified_name():
return []

client = _node.create_client(ListParameters, f"{node_name}/list_parameters")
client = _node.create_client(
ListParameters,
f"{node_name}/list_parameters",
callback_group=MutuallyExclusiveCallbackGroup(),
)

ready = client.wait_for_service(timeout_sec=5.0)
if not ready:
raise RuntimeError("Wait for list_parameters service timed out")

request = ListParameters.Request()
future = client.call_async(request)
if _node.executor:
_node.executor.spin_until_future_complete(future)
else:
rclpy.spin_until_future_complete(_node, future)

await future_with_timeout(_node, future, 2.0)
response = future.result()

if response is not None:
Expand Down

0 comments on commit 65eb744

Please sign in to comment.