Skip to content

Connectors

connectors

Connectors are a set of abstract interfaces and implementations designed to provide a unified way to interact with various communication systems, including robot middleware like ROS2, sound devices, and other I/O systems.

Connector Architecture

The connector architecture is built on a hierarchy of abstract base classes and concrete implementations:

Base Classes

BaseConnector[T]

BaseConnector class definition

rai.communication.base_connector.BaseConnector

Bases: Generic[T]

Source code in src/rai_core/rai/communication/base_connector.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
class BaseConnector(Generic[T]):
    def __init__(self, callback_max_workers: int = 4):
        self.callback_max_workers = callback_max_workers
        self.logger = logging.getLogger(self.__class__.__name__)
        self.registered_callbacks: Dict[str, Dict[str, ParametrizedCallback[T]]] = (
            defaultdict(dict)
        )
        self.callback_id_mapping: Dict[str, tuple[str, ParametrizedCallback[T]]] = {}
        self.callback_executor = ThreadPoolExecutor(
            max_workers=self.callback_max_workers
        )

        if not hasattr(self, "__orig_bases__"):
            self.__orig_bases__ = {}
            raise ConnectorException(
                f"Error while instantiating {str(self.__class__)}: "
                "Message type T derived from BaseMessage needs to be provided"
                " e.g. Connector[MessageType]()"
            )
        self.T_class: Type[T] = get_args(self.__orig_bases__[-1])[0]

    def _generate_handle(self) -> str:
        return str(uuid4())

    def send_message(self, message: T, target: str, **kwargs: Optional[Any]) -> None:
        """Implements publish pattern.

        Sends a message to one or more subscribers. The target parameter
        can be used to specify the destination or topic.

        Parameters
        ----------
        message : T
            The message to send.
        target : str
            The destination or topic to send the message to.
        **kwargs : Optional[Any]
            Additional keyword arguments.

        Raises
        ------
        ConnectorException
            If the message cannot be sent.
        NotImplementedError
            If the method is not implemented by the subclass.
        """
        raise NotImplementedError("This method should be implemented by the subclass.")

    def receive_message(
        self, source: str, timeout_sec: float, **kwargs: Optional[Any]
    ) -> T:
        """Implements subscribe pattern.

        Receives a message from a publisher. The source parameter
        can be used to specify the source or topic to subscribe to.

        Parameters
        ----------
        source : str
            The source or topic to receive the message from.
        timeout_sec : float
            Timeout in seconds for receiving the message.
        **kwargs : Optional[Any]
            Additional keyword arguments.

        Returns
        -------
        T
            The received message.

        Raises
        ------
        ConnectorException
            If the message cannot be received.
        NotImplementedError
            If the method is not implemented by the subclass.
        """
        raise NotImplementedError("This method should be implemented by the subclass.")

    def register_callback(
        self,
        source: str,
        callback: Callable[[T | Any], None],
        raw: bool = False,
        **kwargs: Optional[Any],
    ) -> str:
        """Implements register callback.

        Registers a callback to be called when a message is received from a source.
        If raw is False, the callback will receive a T object.
        If raw is True, the callback will receive the raw message.

        Parameters
        ----------
        source : str
            The source to register the callback for.
        callback : Callable[[T | Any], None]
            The callback function to register.
        raw : bool, optional
            Whether to pass raw message to callback, by default False.
        **kwargs : Optional[Any]
            Additional keyword arguments.

        Returns
        -------
        str
            The ID of the registered callback.

        Raises
        ------
        ConnectorException
            If the callback cannot be registered.
        """
        parametrized_callback = ParametrizedCallback[T](callback=callback, raw=raw)
        self.registered_callbacks[source][parametrized_callback.id] = (
            parametrized_callback
        )
        self.callback_id_mapping[parametrized_callback.id] = (
            source,
            parametrized_callback,
        )
        return parametrized_callback.id

    def unregister_callback(self, callback_id: str) -> None:
        """Unregisters a callback from a source.

        Parameters
        ----------
        callback_id : str
            The id of the callback to unregister.

        Raises
        ------
        ConnectorException
            If the callback cannot be unregistered.
        """
        if callback_id not in self.callback_id_mapping:
            raise ConnectorException(f"Callback with id {callback_id} not found.")

        source, _ = self.callback_id_mapping[callback_id]
        del self.registered_callbacks[source][callback_id]
        del self.callback_id_mapping[callback_id]

    def _safe_callback_wrapper(self, callback: Callable[[T], None], message: T) -> None:
        """Safely execute a callback with error handling.

        Parameters
        ----------
        callback : Callable[[T], None]
            The callback function to execute.
        message : T
            The message to pass to the callback.
        """
        try:
            callback(message)
        except Exception as e:
            self.logger.error(f"Error in callback: {str(e)}")

    def general_callback(self, source: str, message: Any) -> None:
        processed_message = self.general_callback_preprocessor(message)
        for parametrized_callback in self.registered_callbacks.get(source, {}).values():
            payload = message if parametrized_callback.raw else processed_message
            self.callback_executor.submit(
                self._safe_callback_wrapper, parametrized_callback.callback, payload
            )

    def general_callback_preprocessor(self, message: Any) -> T:
        """Preprocessor for general callback used to transform any message to a BaseMessage.

        Parameters
        ----------
        message : Any
            The message to preprocess.

        Returns
        -------
        T
            The preprocessed message.

        Raises
        ------
        NotImplementedError
            If the method is not implemented by the subclass.
        """
        raise NotImplementedError("This method should be implemented by the subclass.")

    def service_call(
        self, message: T, target: str, timeout_sec: float, **kwargs: Optional[Any]
    ) -> BaseMessage:
        """Implements request-response pattern.

        Sends a request and waits for a response. The target parameter
        specifies the service endpoint to call.

        Parameters
        ----------
        message : T
            The request message to send.
        target : str
            The service endpoint to call.
        timeout_sec : float
            Timeout in seconds for the service call.
        **kwargs : Optional[Any]
            Additional keyword arguments.

        Returns
        -------
        BaseMessage
            The response message.

        Raises
        ------
        ConnectorException
            If the service call cannot be made.
        NotImplementedError
            If the method is not implemented by the subclass.
        """
        raise NotImplementedError("This method should be implemented by the subclass.")

    def create_service(
        self,
        service_name: str,
        on_request: Callable,
        on_done: Optional[Callable] = None,
        **kwargs: Optional[Any],
    ) -> str:
        """Sets up a service endpoint for handling requests.

        Creates a service that can receive and process requests.
        The on_request callback handles incoming requests,
        and on_done (if provided) is called when the service is terminated.

        Parameters
        ----------
        service_name : str
            Name of the service to create.
        on_request : Callable
            Callback function to handle incoming requests.
        on_done : Optional[Callable], optional
            Callback function called when service is terminated, by default None.
        **kwargs : Optional[Any]
            Additional keyword arguments.

        Returns
        -------
        str
            The handle of the created service.

        Raises
        ------
        ConnectorException
            If the service cannot be created.
        NotImplementedError
            If the method is not implemented by the subclass.
        """
        raise NotImplementedError("This method should be implemented by the subclass.")

    def create_action(
        self,
        action_name: str,
        generate_feedback_callback: Callable,
        **kwargs: Optional[Any],
    ) -> str:
        """Sets up an action endpoint for long-running operations.

        Creates an action that can be started and monitored.
        The generate_feedback_callback is used to provide progress updates
        during the action's execution.

        Parameters
        ----------
        action_name : str
            Name of the action to create.
        generate_feedback_callback : Callable
            Callback function to generate feedback during action execution.
        **kwargs : Optional[Any]
            Additional keyword arguments.

        Returns
        -------
        str
            The handle of the created action.

        Raises
        ------
        ConnectorException
            If the action cannot be created.
        NotImplementedError
            If the method is not implemented by the subclass.
        """
        raise NotImplementedError("This method should be implemented by the subclass.")

    def start_action(
        self,
        action_data: Optional[T],
        target: str,
        on_feedback: Callable,
        on_done: Callable,
        timeout_sec: float,
        **kwargs: Optional[Any],
    ) -> str:
        """Initiates a long-running operation with feedback.

        Starts an action and provides callbacks for feedback and completion.
        The on_feedback callback receives progress updates,
        and on_done is called when the action completes.

        Parameters
        ----------
        action_data : Optional[T]
            Data to pass to the action.
        target : str
            The action endpoint to start.
        on_feedback : Callable
            Callback function to receive progress updates.
        on_done : Callable
            Callback function called when action completes.
        timeout_sec : float
            Timeout in seconds for the action.
        **kwargs : Optional[Any]
            Additional keyword arguments.

        Returns
        -------
        str
            The handle of the started action.

        Raises
        ------
        ConnectorException
            If the action cannot be started.
        NotImplementedError
            If the method is not implemented by the subclass.
        """
        raise NotImplementedError("This method should be implemented by the subclass.")

    def terminate_action(self, action_handle: str, **kwargs: Optional[Any]) -> Any:
        """Cancels an ongoing action.

        Stops the execution of a previously started action.
        The action_handle identifies which action to terminate.

        Parameters
        ----------
        action_handle : str
            The handle of the action to terminate.
        **kwargs : Optional[Any]
            Additional keyword arguments.

        Returns
        -------
        Any
            Result of the termination operation.

        Raises
        ------
        ConnectorException
            If the action cannot be terminated.
        NotImplementedError
            If the method is not implemented by the subclass.
        """
        raise NotImplementedError("This method should be implemented by the subclass.")

    def shutdown(self):
        """Shuts down the connector and releases all resources."""
        self.callback_executor.shutdown(wait=True)

create_action(action_name, generate_feedback_callback, **kwargs)

Sets up an action endpoint for long-running operations.

Creates an action that can be started and monitored. The generate_feedback_callback is used to provide progress updates during the action's execution.

Parameters:

Name Type Description Default
action_name str

Name of the action to create.

required
generate_feedback_callback Callable

Callback function to generate feedback during action execution.

required
**kwargs Optional[Any]

Additional keyword arguments.

{}

Returns:

Type Description
str

The handle of the created action.

Raises:

Type Description
ConnectorException

If the action cannot be created.

NotImplementedError

If the method is not implemented by the subclass.

Source code in src/rai_core/rai/communication/base_connector.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
def create_action(
    self,
    action_name: str,
    generate_feedback_callback: Callable,
    **kwargs: Optional[Any],
) -> str:
    """Sets up an action endpoint for long-running operations.

    Creates an action that can be started and monitored.
    The generate_feedback_callback is used to provide progress updates
    during the action's execution.

    Parameters
    ----------
    action_name : str
        Name of the action to create.
    generate_feedback_callback : Callable
        Callback function to generate feedback during action execution.
    **kwargs : Optional[Any]
        Additional keyword arguments.

    Returns
    -------
    str
        The handle of the created action.

    Raises
    ------
    ConnectorException
        If the action cannot be created.
    NotImplementedError
        If the method is not implemented by the subclass.
    """
    raise NotImplementedError("This method should be implemented by the subclass.")

create_service(service_name, on_request, on_done=None, **kwargs)

Sets up a service endpoint for handling requests.

Creates a service that can receive and process requests. The on_request callback handles incoming requests, and on_done (if provided) is called when the service is terminated.

Parameters:

Name Type Description Default
service_name str

Name of the service to create.

required
on_request Callable

Callback function to handle incoming requests.

required
on_done Optional[Callable]

Callback function called when service is terminated, by default None.

None
**kwargs Optional[Any]

Additional keyword arguments.

{}

Returns:

Type Description
str

The handle of the created service.

Raises:

Type Description
ConnectorException

If the service cannot be created.

NotImplementedError

If the method is not implemented by the subclass.

Source code in src/rai_core/rai/communication/base_connector.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def create_service(
    self,
    service_name: str,
    on_request: Callable,
    on_done: Optional[Callable] = None,
    **kwargs: Optional[Any],
) -> str:
    """Sets up a service endpoint for handling requests.

    Creates a service that can receive and process requests.
    The on_request callback handles incoming requests,
    and on_done (if provided) is called when the service is terminated.

    Parameters
    ----------
    service_name : str
        Name of the service to create.
    on_request : Callable
        Callback function to handle incoming requests.
    on_done : Optional[Callable], optional
        Callback function called when service is terminated, by default None.
    **kwargs : Optional[Any]
        Additional keyword arguments.

    Returns
    -------
    str
        The handle of the created service.

    Raises
    ------
    ConnectorException
        If the service cannot be created.
    NotImplementedError
        If the method is not implemented by the subclass.
    """
    raise NotImplementedError("This method should be implemented by the subclass.")

general_callback_preprocessor(message)

Preprocessor for general callback used to transform any message to a BaseMessage.

Parameters:

Name Type Description Default
message Any

The message to preprocess.

required

Returns:

Type Description
T

The preprocessed message.

Raises:

Type Description
NotImplementedError

If the method is not implemented by the subclass.

Source code in src/rai_core/rai/communication/base_connector.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def general_callback_preprocessor(self, message: Any) -> T:
    """Preprocessor for general callback used to transform any message to a BaseMessage.

    Parameters
    ----------
    message : Any
        The message to preprocess.

    Returns
    -------
    T
        The preprocessed message.

    Raises
    ------
    NotImplementedError
        If the method is not implemented by the subclass.
    """
    raise NotImplementedError("This method should be implemented by the subclass.")

receive_message(source, timeout_sec, **kwargs)

Implements subscribe pattern.

Receives a message from a publisher. The source parameter can be used to specify the source or topic to subscribe to.

Parameters:

Name Type Description Default
source str

The source or topic to receive the message from.

required
timeout_sec float

Timeout in seconds for receiving the message.

required
**kwargs Optional[Any]

Additional keyword arguments.

{}

Returns:

Type Description
T

The received message.

Raises:

Type Description
ConnectorException

If the message cannot be received.

NotImplementedError

If the method is not implemented by the subclass.

Source code in src/rai_core/rai/communication/base_connector.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def receive_message(
    self, source: str, timeout_sec: float, **kwargs: Optional[Any]
) -> T:
    """Implements subscribe pattern.

    Receives a message from a publisher. The source parameter
    can be used to specify the source or topic to subscribe to.

    Parameters
    ----------
    source : str
        The source or topic to receive the message from.
    timeout_sec : float
        Timeout in seconds for receiving the message.
    **kwargs : Optional[Any]
        Additional keyword arguments.

    Returns
    -------
    T
        The received message.

    Raises
    ------
    ConnectorException
        If the message cannot be received.
    NotImplementedError
        If the method is not implemented by the subclass.
    """
    raise NotImplementedError("This method should be implemented by the subclass.")

register_callback(source, callback, raw=False, **kwargs)

Implements register callback.

Registers a callback to be called when a message is received from a source. If raw is False, the callback will receive a T object. If raw is True, the callback will receive the raw message.

Parameters:

Name Type Description Default
source str

The source to register the callback for.

required
callback Callable[[T | Any], None]

The callback function to register.

required
raw bool

Whether to pass raw message to callback, by default False.

False
**kwargs Optional[Any]

Additional keyword arguments.

{}

Returns:

Type Description
str

The ID of the registered callback.

Raises:

Type Description
ConnectorException

If the callback cannot be registered.

Source code in src/rai_core/rai/communication/base_connector.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def register_callback(
    self,
    source: str,
    callback: Callable[[T | Any], None],
    raw: bool = False,
    **kwargs: Optional[Any],
) -> str:
    """Implements register callback.

    Registers a callback to be called when a message is received from a source.
    If raw is False, the callback will receive a T object.
    If raw is True, the callback will receive the raw message.

    Parameters
    ----------
    source : str
        The source to register the callback for.
    callback : Callable[[T | Any], None]
        The callback function to register.
    raw : bool, optional
        Whether to pass raw message to callback, by default False.
    **kwargs : Optional[Any]
        Additional keyword arguments.

    Returns
    -------
    str
        The ID of the registered callback.

    Raises
    ------
    ConnectorException
        If the callback cannot be registered.
    """
    parametrized_callback = ParametrizedCallback[T](callback=callback, raw=raw)
    self.registered_callbacks[source][parametrized_callback.id] = (
        parametrized_callback
    )
    self.callback_id_mapping[parametrized_callback.id] = (
        source,
        parametrized_callback,
    )
    return parametrized_callback.id

send_message(message, target, **kwargs)

Implements publish pattern.

Sends a message to one or more subscribers. The target parameter can be used to specify the destination or topic.

Parameters:

Name Type Description Default
message T

The message to send.

required
target str

The destination or topic to send the message to.

required
**kwargs Optional[Any]

Additional keyword arguments.

{}

Raises:

Type Description
ConnectorException

If the message cannot be sent.

NotImplementedError

If the method is not implemented by the subclass.

Source code in src/rai_core/rai/communication/base_connector.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def send_message(self, message: T, target: str, **kwargs: Optional[Any]) -> None:
    """Implements publish pattern.

    Sends a message to one or more subscribers. The target parameter
    can be used to specify the destination or topic.

    Parameters
    ----------
    message : T
        The message to send.
    target : str
        The destination or topic to send the message to.
    **kwargs : Optional[Any]
        Additional keyword arguments.

    Raises
    ------
    ConnectorException
        If the message cannot be sent.
    NotImplementedError
        If the method is not implemented by the subclass.
    """
    raise NotImplementedError("This method should be implemented by the subclass.")

service_call(message, target, timeout_sec, **kwargs)

Implements request-response pattern.

Sends a request and waits for a response. The target parameter specifies the service endpoint to call.

Parameters:

Name Type Description Default
message T

The request message to send.

required
target str

The service endpoint to call.

required
timeout_sec float

Timeout in seconds for the service call.

required
**kwargs Optional[Any]

Additional keyword arguments.

{}

Returns:

Type Description
BaseMessage

The response message.

Raises:

Type Description
ConnectorException

If the service call cannot be made.

NotImplementedError

If the method is not implemented by the subclass.

Source code in src/rai_core/rai/communication/base_connector.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def service_call(
    self, message: T, target: str, timeout_sec: float, **kwargs: Optional[Any]
) -> BaseMessage:
    """Implements request-response pattern.

    Sends a request and waits for a response. The target parameter
    specifies the service endpoint to call.

    Parameters
    ----------
    message : T
        The request message to send.
    target : str
        The service endpoint to call.
    timeout_sec : float
        Timeout in seconds for the service call.
    **kwargs : Optional[Any]
        Additional keyword arguments.

    Returns
    -------
    BaseMessage
        The response message.

    Raises
    ------
    ConnectorException
        If the service call cannot be made.
    NotImplementedError
        If the method is not implemented by the subclass.
    """
    raise NotImplementedError("This method should be implemented by the subclass.")

shutdown()

Shuts down the connector and releases all resources.

Source code in src/rai_core/rai/communication/base_connector.py
436
437
438
def shutdown(self):
    """Shuts down the connector and releases all resources."""
    self.callback_executor.shutdown(wait=True)

start_action(action_data, target, on_feedback, on_done, timeout_sec, **kwargs)

Initiates a long-running operation with feedback.

Starts an action and provides callbacks for feedback and completion. The on_feedback callback receives progress updates, and on_done is called when the action completes.

Parameters:

Name Type Description Default
action_data Optional[T]

Data to pass to the action.

required
target str

The action endpoint to start.

required
on_feedback Callable

Callback function to receive progress updates.

required
on_done Callable

Callback function called when action completes.

required
timeout_sec float

Timeout in seconds for the action.

required
**kwargs Optional[Any]

Additional keyword arguments.

{}

Returns:

Type Description
str

The handle of the started action.

Raises:

Type Description
ConnectorException

If the action cannot be started.

NotImplementedError

If the method is not implemented by the subclass.

Source code in src/rai_core/rai/communication/base_connector.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
def start_action(
    self,
    action_data: Optional[T],
    target: str,
    on_feedback: Callable,
    on_done: Callable,
    timeout_sec: float,
    **kwargs: Optional[Any],
) -> str:
    """Initiates a long-running operation with feedback.

    Starts an action and provides callbacks for feedback and completion.
    The on_feedback callback receives progress updates,
    and on_done is called when the action completes.

    Parameters
    ----------
    action_data : Optional[T]
        Data to pass to the action.
    target : str
        The action endpoint to start.
    on_feedback : Callable
        Callback function to receive progress updates.
    on_done : Callable
        Callback function called when action completes.
    timeout_sec : float
        Timeout in seconds for the action.
    **kwargs : Optional[Any]
        Additional keyword arguments.

    Returns
    -------
    str
        The handle of the started action.

    Raises
    ------
    ConnectorException
        If the action cannot be started.
    NotImplementedError
        If the method is not implemented by the subclass.
    """
    raise NotImplementedError("This method should be implemented by the subclass.")

terminate_action(action_handle, **kwargs)

Cancels an ongoing action.

Stops the execution of a previously started action. The action_handle identifies which action to terminate.

Parameters:

Name Type Description Default
action_handle str

The handle of the action to terminate.

required
**kwargs Optional[Any]

Additional keyword arguments.

{}

Returns:

Type Description
Any

Result of the termination operation.

Raises:

Type Description
ConnectorException

If the action cannot be terminated.

NotImplementedError

If the method is not implemented by the subclass.

Source code in src/rai_core/rai/communication/base_connector.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def terminate_action(self, action_handle: str, **kwargs: Optional[Any]) -> Any:
    """Cancels an ongoing action.

    Stops the execution of a previously started action.
    The action_handle identifies which action to terminate.

    Parameters
    ----------
    action_handle : str
        The handle of the action to terminate.
    **kwargs : Optional[Any]
        Additional keyword arguments.

    Returns
    -------
    Any
        Result of the termination operation.

    Raises
    ------
    ConnectorException
        If the action cannot be terminated.
    NotImplementedError
        If the method is not implemented by the subclass.
    """
    raise NotImplementedError("This method should be implemented by the subclass.")

unregister_callback(callback_id)

Unregisters a callback from a source.

Parameters:

Name Type Description Default
callback_id str

The id of the callback to unregister.

required

Raises:

Type Description
ConnectorException

If the callback cannot be unregistered.

Source code in src/rai_core/rai/communication/base_connector.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def unregister_callback(self, callback_id: str) -> None:
    """Unregisters a callback from a source.

    Parameters
    ----------
    callback_id : str
        The id of the callback to unregister.

    Raises
    ------
    ConnectorException
        If the callback cannot be unregistered.
    """
    if callback_id not in self.callback_id_mapping:
        raise ConnectorException(f"Callback with id {callback_id} not found.")

    source, _ = self.callback_id_mapping[callback_id]
    del self.registered_callbacks[source][callback_id]
    del self.callback_id_mapping[callback_id]

The foundation interface that defines common communication patterns:

  • Message passing (publish/subscribe)
  • Service calls (request/response)
  • Actions (long-running operations with feedback)
  • Callback registration for asynchronous notifications

HRIConnector[T]

HRIConnector class definition

rai.communication.hri_connector.HRIConnector

Bases: Generic[T], BaseConnector[T]

Base class for Human-Robot Interaction (HRI) connectors. Used for sending and receiving messages between human and robot from various sources.

Source code in src/rai_core/rai/communication/hri_connector.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
class HRIConnector(Generic[T], BaseConnector[T]):
    """
    Base class for Human-Robot Interaction (HRI) connectors.
    Used for sending and receiving messages between human and robot from various sources.
    """

    def build_message(
        self,
        message: LangchainBaseMessage | RAIMultimodalMessage,
        communication_id: Optional[str] = None,
        seq_no: int = 0,
        seq_end: bool = False,
    ) -> T:
        """
        Build a new message object from a given input message.

        Parameters
        ----------
        message : LangchainBaseMessage or RAIMultimodalMessage
            The source message to transform into the target type.
        communication_id : str, optional
            An optional identifier for the communication session. Defaults to `None`.
        seq_no : int, optional
            The sequence number of the message in the communication stream. Defaults to `0`.
        seq_end : bool, optional
            Flag indicating whether this message is the final one in the sequence. Defaults to `False`.

        Returns
        -------
        T
            A message instance of type `T` compatible with the Connector, created from the provided input.

        Notes
        -----
        This method uses `self.T_class.from_langchain` for conversion and assumes compatibility.
        """
        return self.T_class.from_langchain(message, communication_id, seq_no, seq_end)  # type: ignore

build_message(message, communication_id=None, seq_no=0, seq_end=False)

Build a new message object from a given input message.

Parameters:

Name Type Description Default
message BaseMessage or MultimodalMessage

The source message to transform into the target type.

required
communication_id str

An optional identifier for the communication session. Defaults to None.

None
seq_no int

The sequence number of the message in the communication stream. Defaults to 0.

0
seq_end bool

Flag indicating whether this message is the final one in the sequence. Defaults to False.

False

Returns:

Type Description
T

A message instance of type T compatible with the Connector, created from the provided input.

Notes

This method uses self.T_class.from_langchain for conversion and assumes compatibility.

Source code in src/rai_core/rai/communication/hri_connector.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def build_message(
    self,
    message: LangchainBaseMessage | RAIMultimodalMessage,
    communication_id: Optional[str] = None,
    seq_no: int = 0,
    seq_end: bool = False,
) -> T:
    """
    Build a new message object from a given input message.

    Parameters
    ----------
    message : LangchainBaseMessage or RAIMultimodalMessage
        The source message to transform into the target type.
    communication_id : str, optional
        An optional identifier for the communication session. Defaults to `None`.
    seq_no : int, optional
        The sequence number of the message in the communication stream. Defaults to `0`.
    seq_end : bool, optional
        Flag indicating whether this message is the final one in the sequence. Defaults to `False`.

    Returns
    -------
    T
        A message instance of type `T` compatible with the Connector, created from the provided input.

    Notes
    -----
    This method uses `self.T_class.from_langchain` for conversion and assumes compatibility.
    """
    return self.T_class.from_langchain(message, communication_id, seq_no, seq_end)  # type: ignore

Extends BaseConnector with Human-Robot Interaction capabilities:

  • Supports multimodal messages (text, images, audio)
  • Provides conversion to/from Langchain message formats
  • Handles message sequencing and conversation IDs

Concrete Implementations

Connector Description Documentation Link
ROS 2 Connectors Robot Operating System 2 integration ROS2 Connectors
Sound Device Connector Audio streaming and playback/recording Sound Device Connector

Key Features

Message Types

Connectors are generic over message types derived from BaseMessage:

  • BaseMessage: Foundation message type with payload and metadata
  • ROS2Message: Message type for ROS 2 communication
  • HRIMessage: Multimodal message type with text, images, and audio
  • ROS2HRIMessage: HRIMessage specialized for ROS 2 transport
  • SoundDeviceMessage: Specialized message for audio operations

Communication Patterns

Connectors support multiple communication patterns:

  1. Publish/Subscribe

    • send_message(message, target, **kwargs): Send a message to a target
    • receive_message(source, timeout_sec, **kwargs): Receive a message from a source
    • register_callback(source, callback, **kwargs): Register for asynchronous notifications
  2. Request/Response

    • service_call(message, target, timeout_sec, **kwargs): Make a synchronous service call
  3. Actions

    • start_action(action_data, target, on_feedback, on_done, timeout_sec, **kwargs): Start a long-running action
    • terminate_action(action_handle, **kwargs): Cancel an ongoing action
    • create_action(action_name, generate_feedback_callback, **kwargs): Create an action server

Threading Model

Connectors implement thread-safe operations:

  • ROS 2 connectors use a dedicated thread with MultiThreadedExecutor
  • Callbacks are executed in a ThreadPoolExecutor for concurrent processing
  • Proper synchronization for shared resources
  • Clean shutdown handling for all resources

Usage Examples

Connector Example Usage Documentation
ROS 2 ROS2 Connectors
Sound Device Sound Device Connector

Error Handling

Connectors implement robust error handling:

  • All operations have appropriate timeout parameters
  • Exceptions are properly propagated and documented
  • Callbacks are executed in a protected manner to prevent crashes
  • Resources are properly cleaned up during shutdown

See Also

  • Agents: For more information on the different types of agents in RAI
  • Aggregators: For more information on the different types of aggregators in RAI
  • Langchain Integration: For more information on the LangChain integration within RAI
  • Multimodal messages: For more information on the multimodal LangChain messages in RAI
  • Runners: For more information on the different types of runners in RAI