Skip to content

ROS2 Connectors

RAI provides robust connectors for interacting with ROS 2 middleware, supporting both standard and human-robot interaction (HRI) message flows.

Connector Description Example Usage
ROS2Connector Standard connector for generic ROS 2 topics, services, and actions. connector = ROS2Connector()
ROS2HRIConnector Connector for multimodal HRI messages over ROS 2, combining ROS2BaseConnector and HRIConnector. hri_connector = ROS2HRIConnector()

ROS2Connector

The ROS2Connector is the main interface for publishing, subscribing, and calling services/actions in a ROS 2 system. It is a concrete implementation of ROS2BaseConnector for standard ROS 2 messages.

Class definition

ROS2BaseConnector class definition

ROS2Connector vs ROS2BaseConnector

ROS2Connector is a simple alias for ROS2BaseConnector. It exists mainly to provide a more intuitive and consistent class name for users, but does not add any new functionality.

rai.communication.ros2.connectors.base.ROS2BaseConnector

Bases: ROS2ActionMixin, ROS2ServiceMixin, BaseConnector[T]

ROS2-specific implementation of the BaseConnector.

This connector provides functionality for ROS2 communication through topics, services, and actions, as well as TF (Transform) operations.

Parameters:

Name Type Description Default
node_name str

Name of the ROS2 node. If not provided, generates a unique name with UUID.

f'rai_ros2_connector_{str(uuid4())[-12:]}'
destroy_subscribers bool

Whether to destroy subscribers after receiving a message, by default False.

False
executor_type Literal['single_threaded', 'multi_threaded']

Type of executor to use for processing ROS2 callbacks, by default "multi_threaded".

'multi_threaded'

Methods:

Name Description
get_topics_names_and_types

Get list of available topics and their message types.

get_services_names_and_types

Get list of available services and their types.

get_actions_names_and_types

Get list of available actions and their types.

send_message

Send a message to a specified topic.

receive_message

Receive a message from a specified topic.

wait_for_transform

Wait for a transform to become available.

get_transform

Get the transform between two frames.

create_service

Create a ROS2 service.

create_action

Create a ROS2 action server.

shutdown

Clean up resources and shut down the connector.

Notes

Threading Model: The connector creates an executor that runs in a dedicated thread. This executor processes all ROS2 callbacks and operations asynchronously.

Subscriber Lifecycle: The destroy_subscribers parameter controls subscriber cleanup behavior: - True: Subscribers are destroyed after receiving a message - Pros: Better resource utilization - Cons: Known stability issues (see: https://github.com/ros2/rclpy/issues/1142) - False (default): Subscribers remain active after message reception - Pros: More stable operation, avoids potential crashes - Cons: May lead to memory/performance overhead from inactive subscribers

Source code in src/rai_core/rai/communication/ros2/connectors/base.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
class ROS2BaseConnector(ROS2ActionMixin, ROS2ServiceMixin, BaseConnector[T]):
    """ROS2-specific implementation of the BaseConnector.

    This connector provides functionality for ROS2 communication through topics,
    services, and actions, as well as TF (Transform) operations.

    Parameters
    ----------
    node_name : str, optional
        Name of the ROS2 node. If not provided, generates a unique name with UUID.
    destroy_subscribers : bool, optional
        Whether to destroy subscribers after receiving a message, by default False.
    executor_type : Literal["single_threaded", "multi_threaded"], optional
        Type of executor to use for processing ROS2 callbacks, by default "multi_threaded".

    Methods
    -------
    get_topics_names_and_types()
        Get list of available topics and their message types.
    get_services_names_and_types()
        Get list of available services and their types.
    get_actions_names_and_types()
        Get list of available actions and their types.
    send_message(message, target, msg_type, auto_qos_matching=True, qos_profile=None, **kwargs)
        Send a message to a specified topic.
    receive_message(source, timeout_sec=1.0, msg_type=None, auto_topic_type=True, **kwargs)
        Receive a message from a specified topic.
    wait_for_transform(tf_buffer, target_frame, source_frame, timeout_sec=1.0)
        Wait for a transform to become available.
    get_transform(target_frame, source_frame, timeout_sec=5.0)
        Get the transform between two frames.
    create_service(service_name, on_request, on_done=None, service_type, **kwargs)
        Create a ROS2 service.
    create_action(action_name, generate_feedback_callback, action_type, **kwargs)
        Create a ROS2 action server.
    shutdown()
        Clean up resources and shut down the connector.

    Notes
    -----
    Threading Model:
        The connector creates an executor that runs in a dedicated thread.
        This executor processes all ROS2 callbacks and operations asynchronously.

    Subscriber Lifecycle:
        The `destroy_subscribers` parameter controls subscriber cleanup behavior:
        - True: Subscribers are destroyed after receiving a message
            - Pros: Better resource utilization
            - Cons: Known stability issues (see: https://github.com/ros2/rclpy/issues/1142)
        - False (default): Subscribers remain active after message reception
            - Pros: More stable operation, avoids potential crashes
            - Cons: May lead to memory/performance overhead from inactive subscribers
    """

    def __init__(
        self,
        node_name: str = f"rai_ros2_connector_{str(uuid.uuid4())[-12:]}",
        destroy_subscribers: bool = False,
        executor_type: Literal["single_threaded", "multi_threaded"] = "multi_threaded",
    ):
        """Initialize the ROS2BaseConnector.

        Parameters
        ----------
        node_name : str, optional
            Name of the ROS2 node. If not provided, generates a unique name with UUID.
        destroy_subscribers : bool, optional
            Whether to destroy subscribers after receiving a message, by default False.
        executor_type : Literal["single_threaded", "multi_threaded"], optional
            Type of executor to use for processing ROS2 callbacks, by default "multi_threaded".

        Raises
        ------
        ValueError
            If an invalid executor type is provided.
        """
        super().__init__()

        if not rclpy.ok():
            rclpy.init()
            self.logger.warning(
                "Auto-initializing ROS2, but manual initialization is recommended. "
                "For better control and predictability, call rclpy.init() or ROS2Context before creating this connector."
            )
        self._executor_type = executor_type
        self._node = Node(node_name)
        self._topic_api = ROS2TopicAPI(self._node, destroy_subscribers)
        self._service_api = ROS2ServiceAPI(self._node)
        self._actions_api = ROS2ActionAPI(self._node)
        self._tf_buffer = Buffer(node=self._node)
        self._tf_listener = TransformListener(self._tf_buffer, self._node)

        self._executor_performance_time_delta = 1.0
        self._executor_performance_timer = self._node.create_timer(
            self._executor_performance_time_delta, self._executor_performance_callback
        )
        self._performance_warning_threshold_multiplier: Final[float] = 1.1
        self._available_executors: Final[set[str]] = {
            "MultiThreadedExecutor",
            "SingleThreadedExecutor",
        }
        if self._executor_type == "multi_threaded":
            self._executor = MultiThreadedExecutor()
        elif self._executor_type == "single_threaded":
            self._executor = SingleThreadedExecutor()
        else:
            raise ValueError(f"Invalid executor type: {self._executor_type}")

        self._executor.add_node(self._node)
        self._thread = threading.Thread(target=self._executor.spin)
        self._thread.start()
        self.last_executor_performance_time = time.time()

        # cache for last received messages
        self.last_msg: Dict[str, T] = {}

    def _executor_performance_callback(self) -> None:
        """Monitor executor performance and log warnings if it falls behind schedule.

        This callback checks if the executor is running slower than expected and logs
        a warning with suggestions for alternative executors if performance issues
        are detected.
        """
        current_time = time.time()
        time_behind = (
            current_time
            - self.last_executor_performance_time
            - self._executor_performance_time_delta
        )
        threshold = (
            self._executor_performance_time_delta
            * self._performance_warning_threshold_multiplier
        )

        if time_behind > threshold:
            alternative_executors = self._available_executors - {
                self._executor.__class__.__name__
            }

            self.logger.warning(
                f"{self._executor.__class__.__name__} is {time_behind:.2f} seconds behind. "
                f"If you see this message frequently, consider switching to {', '.join(alternative_executors)}."
            )
            self.last_executor_performance_time = current_time
        else:
            self.last_executor_performance_time = current_time

    def _last_message_callback(self, source: str, msg: T):
        """Store the last received message for a given source.

        Parameters
        ----------
        source : str
            The topic source identifier.
        msg : T
            The received message.
        """
        self.last_msg[source] = msg

    def get_topics_names_and_types(self) -> List[Tuple[str, List[str]]]:
        """Get list of available topics and their message types.

        Returns
        -------
        List[Tuple[str, List[str]]]
            List of tuples containing topic names and their corresponding message types.
        """
        return self._topic_api.get_topic_names_and_types()

    def get_services_names_and_types(self) -> List[Tuple[str, List[str]]]:
        """Get list of available services and their types.

        Returns
        -------
        List[Tuple[str, List[str]]]
            List of tuples containing service names and their corresponding types.
        """
        return self._service_api.get_service_names_and_types()

    def get_actions_names_and_types(self) -> List[Tuple[str, List[str]]]:
        """Get list of available actions and their types.

        Returns
        -------
        List[Tuple[str, List[str]]]
            List of tuples containing action names and their corresponding types.
        """
        return self._actions_api.get_action_names_and_types()

    def send_message(
        self,
        message: T,
        target: str,
        *,
        msg_type: str,
        auto_qos_matching: bool = True,
        qos_profile: Optional[QoSProfile] = None,
        **kwargs: Any,
    ):
        """Send a message to a specified topic.

        Parameters
        ----------
        message : T
            The message to send.
        target : str
            The target topic name.
        msg_type : str
            The ROS2 message type.
        auto_qos_matching : bool, optional
            Whether to automatically match QoS profiles, by default True.
        qos_profile : Optional[QoSProfile], optional
            Custom QoS profile to use, by default None.
        **kwargs : Any
            Additional keyword arguments.
        """
        self._topic_api.publish(
            topic=target,
            msg_content=message.payload,
            msg_type=msg_type,
            auto_qos_matching=auto_qos_matching,
            qos_profile=qos_profile,
        )

    def general_callback_preprocessor(self, message: Any) -> T:
        """Preprocess a raw ROS2 message into a connector message.

        Parameters
        ----------
        message : Any
            The raw ROS2 message.

        Returns
        -------
        T
            The preprocessed message.
        """
        return self.T_class(payload=message, metadata={"msg_type": str(type(message))})

    def register_callback(
        self,
        source: str,
        callback: Callable[[T | Any], None],
        raw: bool = False,
        *,
        msg_type: Optional[str] = None,
        qos_profile: Optional[QoSProfile] = None,
        auto_qos_matching: bool = True,
        **kwargs: Any,
    ) -> str:
        """Register a callback for a topic.

        Parameters
        ----------
        source : str
            The topic to subscribe to.
        callback : Callable[[T | Any], None]
            The callback function to execute when a message is received.
        raw : bool, optional
            Whether to pass raw messages to the callback, by default False.
        msg_type : Optional[str], optional
            The ROS2 message type, by default None.
        qos_profile : Optional[QoSProfile], optional
            Custom QoS profile to use, by default None.
        auto_qos_matching : bool, optional
            Whether to automatically match QoS profiles, by default True.
        **kwargs : Any
            Additional keyword arguments.

        Returns
        -------
        str
            The callback ID.
        """
        exists = self._topic_api.subscriber_exists(source)
        if not exists:
            self._topic_api.create_subscriber(
                topic=source,
                msg_type=msg_type,
                callback=partial(self.general_callback, source),
                qos_profile=qos_profile,
                auto_qos_matching=auto_qos_matching,
            )
        return super().register_callback(source, callback, raw=raw)

    def receive_message(
        self,
        source: str,
        timeout_sec: float = 1.0,
        *,
        msg_type: Optional[str] = None,
        qos_profile: Optional[QoSProfile] = None,
        auto_qos_matching: bool = True,
        **kwargs: Any,
    ) -> T:
        """Receive a message from a topic.

        Parameters
        ----------
        source : str
            The topic to receive from.
        timeout_sec : float, optional
            Timeout in seconds, by default 1.0.
        msg_type : Optional[str], optional
            The ROS2 message type, by default None.
        qos_profile : Optional[QoSProfile], optional
            Custom QoS profile to use, by default None.
        auto_qos_matching : bool, optional
            Whether to automatically match QoS profiles, by default True.
        **kwargs : Any
            Additional keyword arguments.

        Returns
        -------
        T
            The received message.

        Raises
        ------
        TimeoutError
            If no message is received within the timeout period.
        """
        if self._topic_api.subscriber_exists(source):
            # trying to hit cache first
            if source in self.last_msg:
                if self.last_msg[source].timestamp > time.time() - timeout_sec:
                    return self.last_msg[source]
        else:
            self._topic_api.create_subscriber(
                topic=source,
                callback=partial(self.general_callback, source),
                msg_type=msg_type,
                qos_profile=qos_profile,
                auto_qos_matching=auto_qos_matching,
            )
            self.register_callback(source, partial(self._last_message_callback, source))

        start_time = time.time()
        # wait for the message to be received
        while time.time() - start_time < timeout_sec:
            if source in self.last_msg:
                return self.last_msg[source]
            time.sleep(0.1)
        else:
            raise TimeoutError(
                f"Message from {source} not received in {timeout_sec} seconds"
            )

    @staticmethod
    def wait_for_transform(
        tf_buffer: Buffer,
        target_frame: str,
        source_frame: str,
        timeout_sec: float = 1.0,
    ) -> bool:
        """Wait for a transform to become available.

        Parameters
        ----------
        tf_buffer : Buffer
            The TF buffer to check.
        target_frame : str
            The target frame.
        source_frame : str
            The source frame.
        timeout_sec : float, optional
            Timeout in seconds, by default 1.0.

        Returns
        -------
        bool
            True if the transform is available, False otherwise.
        """
        start_time = time.time()
        while time.time() - start_time < timeout_sec:
            if tf_buffer.can_transform(target_frame, source_frame, rclpy.time.Time()):
                return True
            time.sleep(0.1)
        return False

    def get_transform(
        self,
        target_frame: str,
        source_frame: str,
        timeout_sec: float = 5.0,
    ) -> TransformStamped:
        """Get the transform between two frames.

        Parameters
        ----------
        target_frame : str
            The target frame.
        source_frame : str
            The source frame.
        timeout_sec : float, optional
            Timeout in seconds, by default 5.0.

        Returns
        -------
        TransformStamped
            The transform between the frames.

        Raises
        ------
        LookupException
            If the transform is not available within the timeout period.
        """
        transform_available = self.wait_for_transform(
            self._tf_buffer, target_frame, source_frame, timeout_sec
        )
        if not transform_available:
            raise LookupException(
                f"Could not find transform from {source_frame} to {target_frame} in {timeout_sec} seconds"
            )
        transform: TransformStamped = self._tf_buffer.lookup_transform(
            target_frame,
            source_frame,
            rclpy.time.Time(),
            timeout=Duration(seconds=int(timeout_sec)),
        )

        return transform

    def create_service(
        self,
        service_name: str,
        on_request: Callable[[Any, Any], Any],
        on_done: Optional[Callable[[Any, Any], Any]] = None,
        *,
        service_type: str,
        **kwargs: Any,
    ) -> str:
        """Create a ROS2 service.

        Parameters
        ----------
        service_name : str
            The name of the service.
        on_request : Callable[[Any, Any], Any]
            Callback function to handle service requests.
        on_done : Optional[Callable[[Any, Any], Any]], optional
            Callback function called when service is terminated, by default None.
        service_type : str
            The ROS2 service type.
        **kwargs : Any
            Additional keyword arguments.

        Returns
        -------
        str
            The service handle.
        """
        return self._service_api.create_service(
            service_name=service_name,
            callback=on_request,
            service_type=service_type,
            **kwargs,
        )

    def create_action(
        self,
        action_name: str,
        generate_feedback_callback: Callable,
        *,
        action_type: str,
        **kwargs: Any,
    ) -> str:
        """Create a ROS2 action server.

        Parameters
        ----------
        action_name : str
            The name of the action.
        generate_feedback_callback : Callable
            Callback function to generate feedback during action execution.
        action_type : str
            The ROS2 action type.
        **kwargs : Any
            Additional keyword arguments.

        Returns
        -------
        str
            The action handle.
        """
        return self._actions_api.create_action_server(
            action_name=action_name,
            action_type=action_type,
            execute_callback=generate_feedback_callback,
            **kwargs,
        )

    @property
    def node(self) -> Node:
        """Get the ROS2 node.

        Returns
        -------
        Node
            The ROS2 node instance.
        """
        return self._node

    def shutdown(self):
        """Shutdown the connector and clean up resources.

        This method:
        1. Unregisters the TF listener
        2. Destroys the ROS2 node
        3. Shuts down the action API
        4. Shuts down the topic API
        5. Shuts down the executor
        6. Joins the executor thread
        """
        self._tf_listener.unregister()
        self._node.destroy_node()
        self._actions_api.shutdown()
        self._topic_api.shutdown()
        self._executor.shutdown()
        self._thread.join()

node property

Get the ROS2 node.

Returns:

Type Description
Node

The ROS2 node instance.

__init__(node_name=f'rai_ros2_connector_{str(uuid.uuid4())[-12:]}', destroy_subscribers=False, executor_type='multi_threaded')

Initialize the ROS2BaseConnector.

Parameters:

Name Type Description Default
node_name str

Name of the ROS2 node. If not provided, generates a unique name with UUID.

f'rai_ros2_connector_{str(uuid4())[-12:]}'
destroy_subscribers bool

Whether to destroy subscribers after receiving a message, by default False.

False
executor_type Literal['single_threaded', 'multi_threaded']

Type of executor to use for processing ROS2 callbacks, by default "multi_threaded".

'multi_threaded'

Raises:

Type Description
ValueError

If an invalid executor type is provided.

Source code in src/rai_core/rai/communication/ros2/connectors/base.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def __init__(
    self,
    node_name: str = f"rai_ros2_connector_{str(uuid.uuid4())[-12:]}",
    destroy_subscribers: bool = False,
    executor_type: Literal["single_threaded", "multi_threaded"] = "multi_threaded",
):
    """Initialize the ROS2BaseConnector.

    Parameters
    ----------
    node_name : str, optional
        Name of the ROS2 node. If not provided, generates a unique name with UUID.
    destroy_subscribers : bool, optional
        Whether to destroy subscribers after receiving a message, by default False.
    executor_type : Literal["single_threaded", "multi_threaded"], optional
        Type of executor to use for processing ROS2 callbacks, by default "multi_threaded".

    Raises
    ------
    ValueError
        If an invalid executor type is provided.
    """
    super().__init__()

    if not rclpy.ok():
        rclpy.init()
        self.logger.warning(
            "Auto-initializing ROS2, but manual initialization is recommended. "
            "For better control and predictability, call rclpy.init() or ROS2Context before creating this connector."
        )
    self._executor_type = executor_type
    self._node = Node(node_name)
    self._topic_api = ROS2TopicAPI(self._node, destroy_subscribers)
    self._service_api = ROS2ServiceAPI(self._node)
    self._actions_api = ROS2ActionAPI(self._node)
    self._tf_buffer = Buffer(node=self._node)
    self._tf_listener = TransformListener(self._tf_buffer, self._node)

    self._executor_performance_time_delta = 1.0
    self._executor_performance_timer = self._node.create_timer(
        self._executor_performance_time_delta, self._executor_performance_callback
    )
    self._performance_warning_threshold_multiplier: Final[float] = 1.1
    self._available_executors: Final[set[str]] = {
        "MultiThreadedExecutor",
        "SingleThreadedExecutor",
    }
    if self._executor_type == "multi_threaded":
        self._executor = MultiThreadedExecutor()
    elif self._executor_type == "single_threaded":
        self._executor = SingleThreadedExecutor()
    else:
        raise ValueError(f"Invalid executor type: {self._executor_type}")

    self._executor.add_node(self._node)
    self._thread = threading.Thread(target=self._executor.spin)
    self._thread.start()
    self.last_executor_performance_time = time.time()

    # cache for last received messages
    self.last_msg: Dict[str, T] = {}

create_action(action_name, generate_feedback_callback, *, action_type, **kwargs)

Create a ROS2 action server.

Parameters:

Name Type Description Default
action_name str

The name of the action.

required
generate_feedback_callback Callable

Callback function to generate feedback during action execution.

required
action_type str

The ROS2 action type.

required
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
str

The action handle.

Source code in src/rai_core/rai/communication/ros2/connectors/base.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
def create_action(
    self,
    action_name: str,
    generate_feedback_callback: Callable,
    *,
    action_type: str,
    **kwargs: Any,
) -> str:
    """Create a ROS2 action server.

    Parameters
    ----------
    action_name : str
        The name of the action.
    generate_feedback_callback : Callable
        Callback function to generate feedback during action execution.
    action_type : str
        The ROS2 action type.
    **kwargs : Any
        Additional keyword arguments.

    Returns
    -------
    str
        The action handle.
    """
    return self._actions_api.create_action_server(
        action_name=action_name,
        action_type=action_type,
        execute_callback=generate_feedback_callback,
        **kwargs,
    )

create_service(service_name, on_request, on_done=None, *, service_type, **kwargs)

Create a ROS2 service.

Parameters:

Name Type Description Default
service_name str

The name of the service.

required
on_request Callable[[Any, Any], Any]

Callback function to handle service requests.

required
on_done Optional[Callable[[Any, Any], Any]]

Callback function called when service is terminated, by default None.

None
service_type str

The ROS2 service type.

required
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
str

The service handle.

Source code in src/rai_core/rai/communication/ros2/connectors/base.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
def create_service(
    self,
    service_name: str,
    on_request: Callable[[Any, Any], Any],
    on_done: Optional[Callable[[Any, Any], Any]] = None,
    *,
    service_type: str,
    **kwargs: Any,
) -> str:
    """Create a ROS2 service.

    Parameters
    ----------
    service_name : str
        The name of the service.
    on_request : Callable[[Any, Any], Any]
        Callback function to handle service requests.
    on_done : Optional[Callable[[Any, Any], Any]], optional
        Callback function called when service is terminated, by default None.
    service_type : str
        The ROS2 service type.
    **kwargs : Any
        Additional keyword arguments.

    Returns
    -------
    str
        The service handle.
    """
    return self._service_api.create_service(
        service_name=service_name,
        callback=on_request,
        service_type=service_type,
        **kwargs,
    )

general_callback_preprocessor(message)

Preprocess a raw ROS2 message into a connector message.

Parameters:

Name Type Description Default
message Any

The raw ROS2 message.

required

Returns:

Type Description
T

The preprocessed message.

Source code in src/rai_core/rai/communication/ros2/connectors/base.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def general_callback_preprocessor(self, message: Any) -> T:
    """Preprocess a raw ROS2 message into a connector message.

    Parameters
    ----------
    message : Any
        The raw ROS2 message.

    Returns
    -------
    T
        The preprocessed message.
    """
    return self.T_class(payload=message, metadata={"msg_type": str(type(message))})

get_actions_names_and_types()

Get list of available actions and their types.

Returns:

Type Description
List[Tuple[str, List[str]]]

List of tuples containing action names and their corresponding types.

Source code in src/rai_core/rai/communication/ros2/connectors/base.py
223
224
225
226
227
228
229
230
231
def get_actions_names_and_types(self) -> List[Tuple[str, List[str]]]:
    """Get list of available actions and their types.

    Returns
    -------
    List[Tuple[str, List[str]]]
        List of tuples containing action names and their corresponding types.
    """
    return self._actions_api.get_action_names_and_types()

get_services_names_and_types()

Get list of available services and their types.

Returns:

Type Description
List[Tuple[str, List[str]]]

List of tuples containing service names and their corresponding types.

Source code in src/rai_core/rai/communication/ros2/connectors/base.py
213
214
215
216
217
218
219
220
221
def get_services_names_and_types(self) -> List[Tuple[str, List[str]]]:
    """Get list of available services and their types.

    Returns
    -------
    List[Tuple[str, List[str]]]
        List of tuples containing service names and their corresponding types.
    """
    return self._service_api.get_service_names_and_types()

get_topics_names_and_types()

Get list of available topics and their message types.

Returns:

Type Description
List[Tuple[str, List[str]]]

List of tuples containing topic names and their corresponding message types.

Source code in src/rai_core/rai/communication/ros2/connectors/base.py
203
204
205
206
207
208
209
210
211
def get_topics_names_and_types(self) -> List[Tuple[str, List[str]]]:
    """Get list of available topics and their message types.

    Returns
    -------
    List[Tuple[str, List[str]]]
        List of tuples containing topic names and their corresponding message types.
    """
    return self._topic_api.get_topic_names_and_types()

get_transform(target_frame, source_frame, timeout_sec=5.0)

Get the transform between two frames.

Parameters:

Name Type Description Default
target_frame str

The target frame.

required
source_frame str

The source frame.

required
timeout_sec float

Timeout in seconds, by default 5.0.

5.0

Returns:

Type Description
TransformStamped

The transform between the frames.

Raises:

Type Description
LookupException

If the transform is not available within the timeout period.

Source code in src/rai_core/rai/communication/ros2/connectors/base.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def get_transform(
    self,
    target_frame: str,
    source_frame: str,
    timeout_sec: float = 5.0,
) -> TransformStamped:
    """Get the transform between two frames.

    Parameters
    ----------
    target_frame : str
        The target frame.
    source_frame : str
        The source frame.
    timeout_sec : float, optional
        Timeout in seconds, by default 5.0.

    Returns
    -------
    TransformStamped
        The transform between the frames.

    Raises
    ------
    LookupException
        If the transform is not available within the timeout period.
    """
    transform_available = self.wait_for_transform(
        self._tf_buffer, target_frame, source_frame, timeout_sec
    )
    if not transform_available:
        raise LookupException(
            f"Could not find transform from {source_frame} to {target_frame} in {timeout_sec} seconds"
        )
    transform: TransformStamped = self._tf_buffer.lookup_transform(
        target_frame,
        source_frame,
        rclpy.time.Time(),
        timeout=Duration(seconds=int(timeout_sec)),
    )

    return transform

receive_message(source, timeout_sec=1.0, *, msg_type=None, qos_profile=None, auto_qos_matching=True, **kwargs)

Receive a message from a topic.

Parameters:

Name Type Description Default
source str

The topic to receive from.

required
timeout_sec float

Timeout in seconds, by default 1.0.

1.0
msg_type Optional[str]

The ROS2 message type, by default None.

None
qos_profile Optional[QoSProfile]

Custom QoS profile to use, by default None.

None
auto_qos_matching bool

Whether to automatically match QoS profiles, by default True.

True
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
T

The received message.

Raises:

Type Description
TimeoutError

If no message is received within the timeout period.

Source code in src/rai_core/rai/communication/ros2/connectors/base.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def receive_message(
    self,
    source: str,
    timeout_sec: float = 1.0,
    *,
    msg_type: Optional[str] = None,
    qos_profile: Optional[QoSProfile] = None,
    auto_qos_matching: bool = True,
    **kwargs: Any,
) -> T:
    """Receive a message from a topic.

    Parameters
    ----------
    source : str
        The topic to receive from.
    timeout_sec : float, optional
        Timeout in seconds, by default 1.0.
    msg_type : Optional[str], optional
        The ROS2 message type, by default None.
    qos_profile : Optional[QoSProfile], optional
        Custom QoS profile to use, by default None.
    auto_qos_matching : bool, optional
        Whether to automatically match QoS profiles, by default True.
    **kwargs : Any
        Additional keyword arguments.

    Returns
    -------
    T
        The received message.

    Raises
    ------
    TimeoutError
        If no message is received within the timeout period.
    """
    if self._topic_api.subscriber_exists(source):
        # trying to hit cache first
        if source in self.last_msg:
            if self.last_msg[source].timestamp > time.time() - timeout_sec:
                return self.last_msg[source]
    else:
        self._topic_api.create_subscriber(
            topic=source,
            callback=partial(self.general_callback, source),
            msg_type=msg_type,
            qos_profile=qos_profile,
            auto_qos_matching=auto_qos_matching,
        )
        self.register_callback(source, partial(self._last_message_callback, source))

    start_time = time.time()
    # wait for the message to be received
    while time.time() - start_time < timeout_sec:
        if source in self.last_msg:
            return self.last_msg[source]
        time.sleep(0.1)
    else:
        raise TimeoutError(
            f"Message from {source} not received in {timeout_sec} seconds"
        )

register_callback(source, callback, raw=False, *, msg_type=None, qos_profile=None, auto_qos_matching=True, **kwargs)

Register a callback for a topic.

Parameters:

Name Type Description Default
source str

The topic to subscribe to.

required
callback Callable[[T | Any], None]

The callback function to execute when a message is received.

required
raw bool

Whether to pass raw messages to the callback, by default False.

False
msg_type Optional[str]

The ROS2 message type, by default None.

None
qos_profile Optional[QoSProfile]

Custom QoS profile to use, by default None.

None
auto_qos_matching bool

Whether to automatically match QoS profiles, by default True.

True
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
str

The callback ID.

Source code in src/rai_core/rai/communication/ros2/connectors/base.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def register_callback(
    self,
    source: str,
    callback: Callable[[T | Any], None],
    raw: bool = False,
    *,
    msg_type: Optional[str] = None,
    qos_profile: Optional[QoSProfile] = None,
    auto_qos_matching: bool = True,
    **kwargs: Any,
) -> str:
    """Register a callback for a topic.

    Parameters
    ----------
    source : str
        The topic to subscribe to.
    callback : Callable[[T | Any], None]
        The callback function to execute when a message is received.
    raw : bool, optional
        Whether to pass raw messages to the callback, by default False.
    msg_type : Optional[str], optional
        The ROS2 message type, by default None.
    qos_profile : Optional[QoSProfile], optional
        Custom QoS profile to use, by default None.
    auto_qos_matching : bool, optional
        Whether to automatically match QoS profiles, by default True.
    **kwargs : Any
        Additional keyword arguments.

    Returns
    -------
    str
        The callback ID.
    """
    exists = self._topic_api.subscriber_exists(source)
    if not exists:
        self._topic_api.create_subscriber(
            topic=source,
            msg_type=msg_type,
            callback=partial(self.general_callback, source),
            qos_profile=qos_profile,
            auto_qos_matching=auto_qos_matching,
        )
    return super().register_callback(source, callback, raw=raw)

send_message(message, target, *, msg_type, auto_qos_matching=True, qos_profile=None, **kwargs)

Send a message to a specified topic.

Parameters:

Name Type Description Default
message T

The message to send.

required
target str

The target topic name.

required
msg_type str

The ROS2 message type.

required
auto_qos_matching bool

Whether to automatically match QoS profiles, by default True.

True
qos_profile Optional[QoSProfile]

Custom QoS profile to use, by default None.

None
**kwargs Any

Additional keyword arguments.

{}
Source code in src/rai_core/rai/communication/ros2/connectors/base.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def send_message(
    self,
    message: T,
    target: str,
    *,
    msg_type: str,
    auto_qos_matching: bool = True,
    qos_profile: Optional[QoSProfile] = None,
    **kwargs: Any,
):
    """Send a message to a specified topic.

    Parameters
    ----------
    message : T
        The message to send.
    target : str
        The target topic name.
    msg_type : str
        The ROS2 message type.
    auto_qos_matching : bool, optional
        Whether to automatically match QoS profiles, by default True.
    qos_profile : Optional[QoSProfile], optional
        Custom QoS profile to use, by default None.
    **kwargs : Any
        Additional keyword arguments.
    """
    self._topic_api.publish(
        topic=target,
        msg_content=message.payload,
        msg_type=msg_type,
        auto_qos_matching=auto_qos_matching,
        qos_profile=qos_profile,
    )

shutdown()

Shutdown the connector and clean up resources.

This method: 1. Unregisters the TF listener 2. Destroys the ROS2 node 3. Shuts down the action API 4. Shuts down the topic API 5. Shuts down the executor 6. Joins the executor thread

Source code in src/rai_core/rai/communication/ros2/connectors/base.py
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
def shutdown(self):
    """Shutdown the connector and clean up resources.

    This method:
    1. Unregisters the TF listener
    2. Destroys the ROS2 node
    3. Shuts down the action API
    4. Shuts down the topic API
    5. Shuts down the executor
    6. Joins the executor thread
    """
    self._tf_listener.unregister()
    self._node.destroy_node()
    self._actions_api.shutdown()
    self._topic_api.shutdown()
    self._executor.shutdown()
    self._thread.join()

wait_for_transform(tf_buffer, target_frame, source_frame, timeout_sec=1.0) staticmethod

Wait for a transform to become available.

Parameters:

Name Type Description Default
tf_buffer Buffer

The TF buffer to check.

required
target_frame str

The target frame.

required
source_frame str

The source frame.

required
timeout_sec float

Timeout in seconds, by default 1.0.

1.0

Returns:

Type Description
bool

True if the transform is available, False otherwise.

Source code in src/rai_core/rai/communication/ros2/connectors/base.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
@staticmethod
def wait_for_transform(
    tf_buffer: Buffer,
    target_frame: str,
    source_frame: str,
    timeout_sec: float = 1.0,
) -> bool:
    """Wait for a transform to become available.

    Parameters
    ----------
    tf_buffer : Buffer
        The TF buffer to check.
    target_frame : str
        The target frame.
    source_frame : str
        The source frame.
    timeout_sec : float, optional
        Timeout in seconds, by default 1.0.

    Returns
    -------
    bool
        True if the transform is available, False otherwise.
    """
    start_time = time.time()
    while time.time() - start_time < timeout_sec:
        if tf_buffer.can_transform(target_frame, source_frame, rclpy.time.Time()):
            return True
        time.sleep(0.1)
    return False

Key Features

  • Manages ROS 2 node lifecycle and threading (via MultiThreadedExecutor, see ROS 2 Executors)
  • Supports topic-based message passing (publish/subscribe, see ROS 2 Topics)
  • Service calls (request/response, see ROS 2 Services)
  • Actions (long-running operations with feedback, see ROS 2 Actions)
  • TF (Transform) operations, see ROS 2 TF
  • Callback registration for asynchronous notifications

Example Usage

from rai.communication.ros2.connectors import ROS2Connector

connector = ROS2Connector()

# Send a message to a topic
connector.send_message(
    message=my_msg,  # ROS2Message
    target="/my_topic",
    msg_type="std_msgs/msg/String"
)

# Register a callback for a topic
connector.register_callback(
    source="/my_topic",
    callback=my_callback,
    msg_type="std_msgs/msg/String"
)

# Call a service
response = connector.service_call(
    message=my_request_msg,
    target="/my_service",
    msg_type="my_package/srv/MyService"
)

# Start an action
handle = connector.start_action(
    action_data=my_goal_msg,
    target="/my_action",
    msg_type="my_package/action/MyAction",
    on_feedback=feedback_cb,
    on_done=done_cb
)

# Get available topics
topics = connector.get_topics_names_and_types()

Node Lifecycle and Threading

The connector creates a dedicated ROS 2 node and runs it in a background thread, using a MultiThreadedExecutor for asynchronous operations. This allows for concurrent message handling and callback execution.

ROS2HRIConnector

The ROS2HRIConnector extends ROS2BaseConnector and implements the HRIConnector interface for multimodal human-robot interaction messages. It is specialized for exchanging ROS2HRIMessage objects, which can contain text, images, and audio.

Key Features

  • Publishes and subscribes to rai_interfaces/msg/HRIMessage topics
  • Converts between ROS 2 HRI messages and the internal multimodal format
  • Supports all standard connector operations (topics, services, actions)
  • Suitable for integrating AI agents with human-facing ROS 2 interfaces

Example Usage

from rai.communication.ros2.connectors import ROS2HRIConnector

hri_connector = ROS2HRIConnector()

# Send a multimodal HRI message to a topic
hri_connector.send_message(
    message=my_hri_msg,  # ROS2HRIMessage
    target="/to_human"
)

# Register a callback for incoming HRI messages
hri_connector.register_callback(
    source="/from_human",
    callback=on_human_message
)

Message Conversion

The ROS2HRIConnector automatically converts between ROS 2 rai_interfaces/msg/HRIMessage and the internal ROS2HRIMessage format for seamless multimodal communication.

Usage in Agents

Both connectors are commonly used in RAI agents to interface with ROS 2 environments. Example:

from rai.communication.ros2.connectors import ROS2Connector, ROS2HRIConnector

ros2_connector = ROS2Connector()
hri_connector = ROS2HRIConnector()

# Use with tools or agents
agent = ReActAgent(
    target_connectors={"/to_human": hri_connector},
    tools=ROS2Toolkit(connector=ros2_connector).get_tools()
)

# Subscribe to human input
agent.subscribe_source("/from_human", hri_connector)

See Also