Skip to content

Commit

Permalink
refactor: general fixes and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Andeshog committed Jan 26, 2025
1 parent b349dd7 commit 635489d
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ joystick_interface_auv:
wrench: /thrust/wrench_input
guidance: /dp/reference
killswitch: /orca/killswitch
mode: /orca/operation_mode
mode: /orca/operation_mode
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
#!/usr/bin/python3

import rclpy
from geometry_msgs.msg import PoseWithCovarianceStamped, Wrench
from joystick_utils import JoyStates, Wired, WirelessXboxSeriesX
from rclpy.node import Node
from geometry_msgs.msg import Wrench
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
from sensor_msgs.msg import Joy
from std_msgs.msg import Bool, String
from geometry_msgs.msg import PoseWithCovarianceStamped, Wrench
from vortex_msgs.msg import ReferenceFilter
from joystick_utils import Wired, WirelessXboxSeriesX, JoyStates
from vortex_utils.python_utils import PoseData
from vortex_utils.ros_converter import pose_from_ros
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy

class JoystickInterface(Node):

class JoystickInterface(Node):
def __init__(self):
super().__init__('joystick_interface_node')

Expand All @@ -30,20 +29,33 @@ def __init__(self):
self.joystick_buttons_map_ = []
self.last_button_press_time_ = 0

self.get_logger().info(f"Joystick interface node started. Current mode: {self.mode_}")
self.get_logger().info(
f"Joystick interface node started. Current mode: {self.mode_}"
)

def get_parameters(self):
gain_params = ['joystick_surge_gain', 'joystick_sway_gain', 'joystick_heave_gain', 'joystick_roll_gain', 'joystick_pitch_gain', 'joystick_yaw_gain',
'guidance_surge_gain', 'guidance_sway_gain', 'guidance_heave_gain', 'guidance_roll_gain', 'guidance_pitch_gain', 'guidance_yaw_gain',
'debounce_duration']

gain_params = [
'joystick_surge_gain',
'joystick_sway_gain',
'joystick_heave_gain',
'joystick_roll_gain',
'joystick_pitch_gain',
'joystick_yaw_gain',
'guidance_surge_gain',
'guidance_sway_gain',
'guidance_heave_gain',
'guidance_roll_gain',
'guidance_pitch_gain',
'guidance_yaw_gain',
'debounce_duration',
]

for param in gain_params:
self.declare_parameter(param, 1.0)
setattr(self, param + '_', self.get_parameter(param).value)

topic_params = ['pose', 'joy', 'wrench',
'guidance', 'killswitch', 'mode']

topic_params = ['pose', 'joy', 'wrench', 'guidance', 'killswitch', 'mode']

for param in topic_params:
self.declare_parameter(f'topics.{param}', "_")
setattr(self, param + '_topic', self.get_parameter(f'topics.{param}').value)
Expand All @@ -60,26 +72,33 @@ def set_publishers_and_subscribers(self):
best_effort_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1
depth=1,
)

self.joy_subscriber_ = self.create_subscription(
Joy, self.joy_topic, self.joystick_cb, 5)
Joy, self.joy_topic, self.joystick_cb, 5
)
self.odom_subscriber_ = self.create_subscription(
PoseWithCovarianceStamped, self.pose_topic, self.pose_cb, qos_profile=best_effort_qos)
self.wrench_publisher_ = self.create_publisher(Wrench,
self.wrench_topic, 10)
self.ref_publisher = self.create_publisher(ReferenceFilter, self.guidance_topic, 10)
PoseWithCovarianceStamped,
self.pose_topic,
self.pose_cb,
qos_profile=best_effort_qos,
)
self.wrench_publisher_ = self.create_publisher(Wrench, self.wrench_topic, 10)
self.ref_publisher = self.create_publisher(
ReferenceFilter, self.guidance_topic, 10
)
self.software_killswitch_signal_publisher_ = self.create_publisher(
Bool, self.killswitch_topic, 10)
self.software_killswitch_signal_publisher_.publish(
Bool(data=True))
Bool, self.killswitch_topic, 10
)
self.software_killswitch_signal_publisher_.publish(Bool(data=True))
self.operational_mode_signal_publisher_ = self.create_publisher(
String, self.mode_topic, 10)
String, self.mode_topic, 10
)

def pose_cb(self, msg: PoseWithCovarianceStamped):
self.current_state_ = pose_from_ros(msg)

def create_reference_message(self) -> ReferenceFilter:
reference_msg = ReferenceFilter()
reference_msg.header.stamp = self.get_clock().now().to_msg()
Expand All @@ -93,8 +112,7 @@ def create_reference_message(self) -> ReferenceFilter:
return reference_msg

def create_wrench_message(self) -> Wrench:
"""
Creates a 3D wrench message with the given x, y, heave, roll, pitch, and yaw values.
"""Creates a 3D wrench message with the given x, y, heave, roll, pitch, and yaw values.
Returns:
Wrench: A 3D wrench message with the given values.
Expand All @@ -109,25 +127,20 @@ def create_wrench_message(self) -> Wrench:
return wrench_msg

def transition_to_xbox_mode(self):
"""
Turns off the controller and signals that the operational mode has switched to Xbox mode.
"""
"""Turns off the controller and signals that the operational mode has switched to Xbox mode."""
self.operational_mode_signal_publisher_.publish(String(data="XBOX"))
self.mode_ = JoyStates.XBOX_MODE
self.get_logger().info("XBOX mode")


def transition_to_reference_mode(self):
"""
Publishes a pose message and signals that the operational mode has switched to Reference mode.
"""
"""Publishes a pose message and signals that the operational mode has switched to Reference mode."""
self.desired_state_ = PoseData(
x=self.current_state_.x,
y=self.current_state_.y,
z=self.current_state_.z,
roll=self.current_state_.roll,
pitch=self.current_state_.pitch,
yaw=self.current_state_.yaw
yaw=self.current_state_.yaw,
)
reference_msg = self.create_reference_message()
self.operational_mode_signal_publisher_.publish(String(data="Reference mode"))
Expand All @@ -136,23 +149,19 @@ def transition_to_reference_mode(self):
self.get_logger().info("Reference mode")

def transition_to_autonomous_mode(self):
"""
Publishes a zero force wrench message and signals that the system is turning on autonomous mode.
"""
"""Publishes a zero force wrench message and signals that the system is turning on autonomous mode."""
empty_wrench_msg = Wrench()
self.wrench_publisher_.publish(empty_wrench_msg)
self.operational_mode_signal_publisher_.publish(
String(data="autonomous mode"))
self.operational_mode_signal_publisher_.publish(String(data="autonomous mode"))
self.mode_ = JoyStates.AUTONOMOUS_MODE
self.get_logger().info("autonomous mode")

def check_number_of_buttons(self, msg: Joy):
"""
Checks if the controller is wireless (has 16 buttons) or wired and sets the joystick button and axis maps accordingly.
"""Checks if the controller is wireless (has 16 buttons) or wired and sets the joystick button and axis maps accordingly.
Args:
msg: A ROS message containing the joy input data.
"""
"""
if len(msg.buttons) == 16:
self.joystick_buttons_map_ = WirelessXboxSeriesX.joystick_buttons_map_
self.joystick_axes_map_ = WirelessXboxSeriesX.joystick_axes_map_
Expand All @@ -161,12 +170,11 @@ def check_number_of_buttons(self, msg: Joy):
self.joystick_axes_map_ = Wired.joystick_axes_map_

def populate_buttons_dictionary(self, msg: Joy):
"""
Populates a dictionary with button JoyStates from the joystick message.
"""Populates a dictionary with button JoyStates from the joystick message.
Args:
msg: A ROS message containing the joy input data.
Returns:
A dictionary with button names as keys and their JoyStates as values.
"""
Expand All @@ -175,17 +183,16 @@ def populate_buttons_dictionary(self, msg: Joy):
if i < len(msg.buttons):
buttons[button_name] = msg.buttons[i]
else:
# Assuming default value if button is not present
# Assuming default value if button is not present
buttons[button_name] = 0
return buttons

def populate_axes_dictionary(self, msg: Joy):
"""
Populates a dictionary with axis values from the joystick message.
"""Populates a dictionary with axis values from the joystick message.
Args:
msg: A ROS message containing the joy input data.
Returns:
A dictionary with axis names as keys and their values as values.
"""
Expand All @@ -194,41 +201,34 @@ def populate_axes_dictionary(self, msg: Joy):
if i < len(msg.axes):
axes[axis_name] = msg.axes[i]
else:
# Assuming default value if axis is not present
# Assuming default value if axis is not present
axes[axis_name] = 0.0
return axes

def calculate_movement (self, axes: dict, buttons: dict):
"""
Calculates the movement values based on joystick input.
"""
def calculate_movement(self, axes: dict, buttons: dict):
"""Calculates the movement values based on joystick input."""
left_trigger = axes.get("LT", 0.0)
right_trigger = axes.get("RT", 0.0)
left_shoulder = buttons.get("LB", 0)
right_shoulder = buttons.get("RB", 0)

self.surge = axes.get(
"vertical_axis_left_stick", 0.0
) * self.joystick_surge_gain_
self.sway = -axes.get(
"horizontal_axis_left_stick", 0.0
) * self.joystick_sway_gain_
self.heave = (
left_trigger - right_trigger
) * self.joystick_heave_gain_
self.roll = (
right_shoulder - left_shoulder
) * self.joystick_roll_gain_
self.pitch = -axes.get(
"vertical_axis_right_stick", 0.0
) * self.joystick_pitch_gain_
self.yaw = -axes.get(
"horizontal_axis_right_stick", 0.0
) * self.joystick_yaw_gain_
self.surge = (
axes.get("vertical_axis_left_stick", 0.0) * self.joystick_surge_gain_
)
self.sway = (
-axes.get("horizontal_axis_left_stick", 0.0) * self.joystick_sway_gain_
)
self.heave = (left_trigger - right_trigger) * self.joystick_heave_gain_
self.roll = (right_shoulder - left_shoulder) * self.joystick_roll_gain_
self.pitch = (
-axes.get("vertical_axis_right_stick", 0.0) * self.joystick_pitch_gain_
)
self.yaw = (
-axes.get("horizontal_axis_right_stick", 0.0) * self.joystick_yaw_gain_
)

def handle_killswitch_button(self) -> None:
"""
Handles the software killswitch button press.
"""Handles the software killswitch button press.
This function performs the following actions based on the current state:
1. If the current state is KILLSWITCH, it signals that the killswitch is not blocking,
Expand All @@ -241,23 +241,21 @@ def handle_killswitch_button(self) -> None:
and allows it to resume operation when the killswitch is deactivated.
"""
if self.mode_ == JoyStates.KILLSWITCH:
self.software_killswitch_signal_publisher_.publish(
Bool(data=False))
self.software_killswitch_signal_publisher_.publish(Bool(data=False))
self.transition_to_xbox_mode()
return

else:
self.get_logger().info("SW killswitch")
self.software_killswitch_signal_publisher_.publish(
Bool(data=True))
self.software_killswitch_signal_publisher_.publish(Bool(data=True))
empty_wrench_msg = Wrench()
self.wrench_publisher_.publish(empty_wrench_msg)
self.mode_ = JoyStates.KILLSWITCH
return
def update_reference(self):
"""
Updates the current pose of the AUV based on joystick inputs.

def update_reference(self):
"""Updates the current pose of the AUV based on joystick inputs.
The position and orientation (roll, pitch, yaw) are updated
using the current joystick inputs scaled by their respective parameters.
"""
Expand All @@ -267,26 +265,17 @@ def update_reference(self):
self.desired_state_.roll += self.roll / self.guidance_roll_gain_
self.desired_state_.pitch += self.pitch / self.guidance_pitch_gain_
self.desired_state_.yaw += self.yaw / self.guidance_yaw_gain_

def joystick_cb(self, msg: Joy):
"""
Callback function that receives joy messages and converts them into
wrench messages to be sent to the thruster allocation node.
Handles software killswitch and control mode buttons,
and transitions between different JoyStates of operation.
This function performs the following steps:
1. Checks the number of buttons to set the correct joystick map.
2. Populates dictionaries for button and axis JoyStates.
3. Extracts specific button and axis values.
4. Calculates movement values based on joystick input.
5. Debounces button presses to prevent multiple triggers within a short duration.
6. Updates the last button press time if any button is pressed.
"""Callback function that processes joy messages and converts them into wrench messages.
This function sends wrench messages to the thrust allocation node. It handles
the software killswitch and control mode buttons and transitions between different
JoyStates of operation.
Args:
msg: A ROS message containing the joy input data.
"""

self.check_number_of_buttons(msg)

buttons: dict = self.populate_buttons_dictionary(msg)
Expand All @@ -306,7 +295,14 @@ def joystick_cb(self, msg: Joy):
software_killswitch_button = False
reference_mode_button = False

if any([software_control_mode_button, xbox_control_mode_button, software_killswitch_button, reference_mode_button]):
if any(
[
software_control_mode_button,
xbox_control_mode_button,
software_killswitch_button,
reference_mode_button,
]
):
self.last_button_press_time_ = current_time

if software_killswitch_button:
Expand All @@ -324,7 +320,7 @@ def joystick_cb(self, msg: Joy):
if xbox_control_mode_button:
self.transition_to_xbox_mode()
elif reference_mode_button:
self.transition_to_reference_mode()
self.transition_to_reference_mode()

elif self.mode_ == JoyStates.REFERENCE_MODE:
self.update_reference()
Expand All @@ -336,12 +332,14 @@ def joystick_cb(self, msg: Joy):
elif xbox_control_mode_button:
self.transition_to_xbox_mode()


def main():
rclpy.init()
joystick_interface = JoystickInterface()
rclpy.spin(joystick_interface)
joystick_interface.destroy_node()
rclpy.shutdown()


if __name__ == "__main__":
main()
Loading

0 comments on commit 635489d

Please sign in to comment.